Coverage for manila/tests/api/v2/test_security_service.py: 100%

235 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2012 NetApp 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from unittest import mock 

17from urllib import parse 

18 

19import ddt 

20import webob 

21 

22from manila.api.v2 import security_service 

23from manila.common import constants 

24from manila import db 

25from manila import exception 

26from manila import policy 

27from manila import test 

28from manila.tests.api import fakes 

29 

30 

31@ddt.ddt 

32class ShareApiTest(test.TestCase): 

33 """Share Api Test.""" 

34 def setUp(self): 

35 super(ShareApiTest, self).setUp() 

36 self.controller = security_service.SecurityServiceController() 

37 self.maxDiff = None 

38 self.ss_active_directory = { 

39 "created_at": "fake-time", 

40 "updated_at": "fake-time-2", 

41 "id": 1, 

42 "name": "fake-name", 

43 "description": "Fake Security Service Desc", 

44 "type": constants.SECURITY_SERVICES_ALLOWED_TYPES[0], 

45 "dns_ip": "1.1.1.1", 

46 "server": "fake-server", 

47 "domain": "fake-domain", 

48 "user": "fake-user", 

49 "password": "fake-password", 

50 "status": constants.STATUS_NEW, 

51 "project_id": "fake", 

52 } 

53 self.ss_ldap = { 

54 "created_at": "fake-time", 

55 "updated_at": "fake-time-2", 

56 "id": 2, 

57 "name": "ss-ldap", 

58 "description": "Fake Security Service Desc", 

59 "type": constants.SECURITY_SERVICES_ALLOWED_TYPES[1], 

60 "dns_ip": "2.2.2.2", 

61 "server": "test-server", 

62 "domain": "test-domain", 

63 "user": "test-user", 

64 "password": "test-password", 

65 "status": "active", 

66 "project_id": "fake", 

67 } 

68 self.valid_search_opts = { 

69 'user': 'fake-user', 

70 'server': 'fake-server', 

71 'dns_ip': '1.1.1.1', 

72 'domain': 'fake-domain', 

73 'type': constants.SECURITY_SERVICES_ALLOWED_TYPES[0], 

74 } 

75 self.check_policy_patcher = mock.patch( 

76 'manila.api.v2.security_service.policy.check_policy') 

77 self.check_policy_patcher.start() 

78 self.addCleanup(self._stop_started_patcher, self.check_policy_patcher) 

79 self.security_service_list_expected_resp = { 

80 'security_services': [{ 

81 'id': self.ss_active_directory['id'], 

82 'name': self.ss_active_directory['name'], 

83 'type': self.ss_active_directory['type'], 

84 'status': self.ss_active_directory['status'] 

85 }, ] 

86 } 

87 

88 self.fake_share_network_list_with_share_servers = [{ 

89 'id': 'fake_sn_id', 

90 'share_network_subnets': [{ 

91 'id': 'fake_sns_id', 

92 'share_servers': [{'id': 'fake_ss_id'}] 

93 }] 

94 }] 

95 self.fake_share_network_list_without_share_servers = [{ 

96 'id': 'fake_sn_id', 

97 'share_network_subnets': [{ 

98 'id': 'fake_sns_id', 

99 'share_servers': [] 

100 }] 

101 }] 

102 

103 def _stop_started_patcher(self, patcher): 

104 if hasattr(patcher, 'is_local'): 

105 patcher.stop() 

106 

107 def test_security_service_show(self): 

108 db.security_service_get = mock.Mock( 

109 return_value=self.ss_active_directory) 

110 req = fakes.HTTPRequest.blank('/security-services/1') 

111 res_dict = self.controller.show(req, '1') 

112 expected = self.ss_active_directory.copy() 

113 expected.update() 

114 self.assertEqual({'security_service': self.ss_active_directory}, 

115 res_dict) 

116 

117 def test_security_service_show_not_found(self): 

118 db.security_service_get = mock.Mock(side_effect=exception.NotFound) 

119 req = fakes.HTTPRequest.blank('/shares/1') 

120 self.assertRaises(webob.exc.HTTPNotFound, 

121 self.controller.show, 

122 req, '1') 

123 

124 def test_security_service_create(self): 

125 sec_service = self.ss_active_directory.copy() 

126 create_stub = mock.Mock( 

127 return_value=sec_service) 

128 self.mock_object(db, 'security_service_create', create_stub) 

129 

130 req = fakes.HTTPRequest.blank('/security-services') 

131 res_dict = self.controller.create( 

132 req, {"security_service": sec_service}) 

133 expected = self.ss_active_directory.copy() 

134 self.assertEqual({'security_service': expected}, res_dict) 

135 

136 def test_security_service_create_invalid_types(self): 

137 sec_service = self.ss_active_directory.copy() 

138 sec_service['type'] = 'invalid' 

139 req = fakes.HTTPRequest.blank('/security-services') 

140 self.assertRaises(exception.InvalidInput, self.controller.create, req, 

141 {"security_service": sec_service}) 

142 

143 @ddt.data('2.76') 

144 def test_security_service_create_invalid_active_directory(self, version): 

145 sec_service = self.ss_active_directory.copy() 

146 sec_service['default_ad_site'] = 'fake_default_ad_site' 

147 req = fakes.HTTPRequest.blank('/security-services', version=version) 

148 self.assertRaises(exception.InvalidInput, self.controller.create, req, 

149 {"security_service": sec_service}) 

150 

151 def test_create_security_service_no_body(self): 

152 body = {} 

153 req = fakes.HTTPRequest.blank('/security-services') 

154 self.assertRaises(webob.exc.HTTPUnprocessableEntity, 

155 self.controller.create, 

156 req, 

157 body) 

158 

159 def test_security_service_delete(self): 

160 db.security_service_delete = mock.Mock() 

161 db.security_service_get = mock.Mock() 

162 db.share_network_get_all_by_security_service = mock.Mock( 

163 return_value=[]) 

164 req = fakes.HTTPRequest.blank('/security_services/1') 

165 resp = self.controller.delete(req, 1) 

166 db.security_service_delete.assert_called_once_with( 

167 req.environ['manila.context'], 1) 

168 self.assertEqual(202, resp.status_int) 

169 

170 def test_security_service_delete_not_found(self): 

171 db.security_service_get = mock.Mock(side_effect=exception.NotFound) 

172 req = fakes.HTTPRequest.blank('/security_services/1') 

173 self.assertRaises(webob.exc.HTTPNotFound, 

174 self.controller.delete, 

175 req, 

176 1) 

177 

178 def test_security_service_delete_has_share_networks(self): 

179 db.security_service_get = mock.Mock() 

180 db.share_network_get_all_by_security_service = mock.Mock( 

181 return_value=[{'share_network': 'fake_share_network'}]) 

182 req = fakes.HTTPRequest.blank('/security_services/1') 

183 self.assertRaises(webob.exc.HTTPForbidden, self.controller.delete, 

184 req, 1) 

185 

186 def test_security_service_update_name(self): 

187 new = self.ss_active_directory.copy() 

188 updated = self.ss_active_directory.copy() 

189 updated['name'] = 'new' 

190 self.mock_object(security_service.policy, 'check_policy') 

191 db.security_service_get = mock.Mock(return_value=new) 

192 db.security_service_update = mock.Mock(return_value=updated) 

193 fake_sns = {'id': 'fake_sns_id', 'share_servers': ['fake_ss']} 

194 db.share_network_get_all_by_security_service = mock.Mock( 

195 return_value=[{ 

196 'id': 'fake_id', 

197 'share_network_subnets': [fake_sns] 

198 }]) 

199 body = {"security_service": {"name": "new"}} 

200 req = fakes.HTTPRequest.blank('/security_service/1') 

201 res_dict = self.controller.update(req, 1, body)['security_service'] 

202 self.assertEqual(updated['name'], res_dict['name']) 

203 db.share_network_get_all_by_security_service.assert_called_once_with( 

204 req.environ['manila.context'], 1) 

205 self.assertEqual(2, security_service.policy.check_policy.call_count) 

206 security_service.policy.check_policy.assert_has_calls([ 

207 mock.call(req.environ['manila.context'], 

208 security_service.RESOURCE_NAME, 'update', new) 

209 ]) 

210 

211 def test_security_service_update_description(self): 

212 new = self.ss_active_directory.copy() 

213 updated = self.ss_active_directory.copy() 

214 updated['description'] = 'new' 

215 self.mock_object(security_service.policy, 'check_policy') 

216 db.security_service_get = mock.Mock(return_value=new) 

217 db.security_service_update = mock.Mock(return_value=updated) 

218 fake_sns = {'id': 'fake_sns_id', 'share_servers': ['fake_ss']} 

219 db.share_network_get_all_by_security_service = mock.Mock( 

220 return_value=[{ 

221 'id': 'fake_id', 

222 'share_network_subnets': [fake_sns] 

223 }]) 

224 body = {"security_service": {"description": "new"}} 

225 req = fakes.HTTPRequest.blank('/security_service/1') 

226 res_dict = self.controller.update(req, 1, body)['security_service'] 

227 self.assertEqual(updated['description'], res_dict['description']) 

228 db.share_network_get_all_by_security_service.assert_called_once_with( 

229 req.environ['manila.context'], 1) 

230 self.assertEqual(2, security_service.policy.check_policy.call_count) 

231 security_service.policy.check_policy.assert_has_calls([ 

232 mock.call(req.environ['manila.context'], 

233 security_service.RESOURCE_NAME, 'update', new) 

234 ]) 

235 

236 @mock.patch.object(db, 'security_service_get', mock.Mock()) 

237 @mock.patch.object(db, 'share_network_get_all_by_security_service', 

238 mock.Mock()) 

239 def test_security_service_update_invalid_keys_sh_server_exists(self): 

240 self.mock_object(security_service.policy, 'check_policy') 

241 fake_sns = {'id': 'fake_sns_id', 'share_servers': ['fake_ss']} 

242 db.share_network_get_all_by_security_service.return_value = [ 

243 {'id': 'fake_id', 'share_network_subnets': [fake_sns]}, 

244 ] 

245 db.security_service_get.return_value = self.ss_active_directory.copy() 

246 body = {'security_service': {'user_id': 'new_user'}} 

247 req = fakes.HTTPRequest.blank('/security_services/1') 

248 self.assertRaises(webob.exc.HTTPForbidden, self.controller.update, 

249 req, 1, body) 

250 db.security_service_get.assert_called_once_with( 

251 req.environ['manila.context'], 1) 

252 db.share_network_get_all_by_security_service.assert_called_once_with( 

253 req.environ['manila.context'], 1) 

254 self.assertEqual(1, security_service.policy.check_policy.call_count) 

255 security_service.policy.check_policy.assert_has_calls([ 

256 mock.call(req.environ['manila.context'], 

257 security_service.RESOURCE_NAME, 'update', 

258 db.security_service_get.return_value) 

259 ]) 

260 

261 @mock.patch.object(db, 'security_service_get', mock.Mock()) 

262 @mock.patch.object(db, 'security_service_update', mock.Mock()) 

263 @mock.patch.object(db, 'share_network_get_all_by_security_service', 

264 mock.Mock()) 

265 def test_security_service_update_valid_keys_sh_server_exists(self): 

266 self.mock_object(security_service.policy, 'check_policy') 

267 fake_sns = {'id': 'fake_sns_id', 'share_servers': ['fake_ss']} 

268 db.share_network_get_all_by_security_service.return_value = [ 

269 {'id': 'fake_id', 'share_network_subnets': [fake_sns]}, 

270 ] 

271 old = self.ss_active_directory.copy() 

272 updated = self.ss_active_directory.copy() 

273 updated['name'] = 'new name' 

274 updated['description'] = 'new description' 

275 db.security_service_get.return_value = old 

276 db.security_service_update.return_value = updated 

277 body = { 

278 'security_service': { 

279 'description': 'new description', 

280 'name': 'new name', 

281 }, 

282 } 

283 req = fakes.HTTPRequest.blank('/security_services/1') 

284 res_dict = self.controller.update(req, 1, body)['security_service'] 

285 self.assertEqual(updated['description'], res_dict['description']) 

286 self.assertEqual(updated['name'], res_dict['name']) 

287 db.security_service_get.assert_called_once_with( 

288 req.environ['manila.context'], 1) 

289 db.share_network_get_all_by_security_service.assert_called_once_with( 

290 req.environ['manila.context'], 1) 

291 db.security_service_update.assert_called_once_with( 

292 req.environ['manila.context'], 1, body['security_service']) 

293 self.assertEqual(2, security_service.policy.check_policy.call_count) 

294 security_service.policy.check_policy.assert_has_calls([ 

295 mock.call(req.environ['manila.context'], 

296 security_service.RESOURCE_NAME, 'update', old) 

297 ]) 

298 

299 @mock.patch.object(db, 'security_service_get', mock.Mock()) 

300 def test_security_service_update_has_share_servers(self): 

301 db.security_service_get = mock.Mock() 

302 self.mock_object( 

303 self.controller, '_share_servers_dependent_on_sn_exist', 

304 mock.Mock(return_value=True)) 

305 body = {"security_service": {"type": "ldap"}} 

306 

307 req = fakes.HTTPRequest.blank('/security_services/1') 

308 self.assertRaises(webob.exc.HTTPForbidden, 

309 self.controller.update, 

310 req, 

311 1, 

312 body) 

313 

314 @ddt.data(True, False) 

315 def test_security_service_update_share_server_dependent_exists(self, 

316 expected): 

317 req = fakes.HTTPRequest.blank('/security_services/1') 

318 context = req.environ['manila.context'] 

319 db.security_service_get = mock.Mock() 

320 network = (self.fake_share_network_list_with_share_servers if expected 

321 else self.fake_share_network_list_without_share_servers) 

322 db.share_network_get_all_by_security_service = mock.Mock( 

323 return_value=network) 

324 

325 result = self.controller._share_servers_dependent_on_sn_exist( 

326 context, 'fake_id') 

327 self.assertEqual(expected, result) 

328 

329 def test_security_service_list(self): 

330 db.security_service_get_all_by_project = mock.Mock( 

331 return_value=[self.ss_active_directory.copy()]) 

332 req = fakes.HTTPRequest.blank('/security_services') 

333 res_dict = self.controller.index(req) 

334 self.assertEqual(self.security_service_list_expected_resp, res_dict) 

335 

336 @mock.patch.object(db, 'share_network_get', mock.Mock()) 

337 def test_security_service_list_filter_by_sn(self): 

338 sn = { 

339 'id': 'fake_sn_id', 

340 'security_services': [self.ss_active_directory, ], 

341 } 

342 db.share_network_get.return_value = sn 

343 req = fakes.HTTPRequest.blank( 

344 '/security-services?share_network_id=fake_sn_id') 

345 res_dict = self.controller.index(req) 

346 self.assertEqual(self.security_service_list_expected_resp, res_dict) 

347 db.share_network_get.assert_called_once_with( 

348 req.environ['manila.context'], 

349 sn['id']) 

350 

351 @mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock()) 

352 @mock.patch.object(db, 'security_service_get_all', mock.Mock()) 

353 def test_security_services_list_all_tenants_policy_authorized(self): 

354 self.check_policy_patcher.stop() 

355 db.security_service_get_all.return_value = [ 

356 self.ss_active_directory, 

357 self.ss_ldap, 

358 ] 

359 req = fakes.HTTPRequest.blank( 

360 '/security-services?all_tenants=1') 

361 self.mock_object(policy, "check_policy", mock.Mock(return_value=True)) 

362 fake_context = req.environ['manila.context'] 

363 self.controller.index(req) 

364 db.security_service_get_all_by_project.assert_not_called() 

365 db.security_service_get_all.assert_called_once_with(fake_context) 

366 

367 @mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock()) 

368 @mock.patch.object(db, 'security_service_get_all', mock.Mock()) 

369 def test_security_services_list_all_tenants_policy_not_authorized(self): 

370 self.check_policy_patcher.stop() 

371 db.security_service_get_all.return_value = [ 

372 self.ss_active_directory, 

373 self.ss_ldap, 

374 ] 

375 req = fakes.HTTPRequest.blank( 

376 '/security-services?all_tenants=1') 

377 self.mock_object(policy, 

378 "check_policy", 

379 mock.Mock(side_effect=exception.NotAuthorized())) 

380 self.assertRaises(exception.NotAuthorized, self.controller.index, req) 

381 db.security_service_get_all_by_project.assert_not_called() 

382 db.security_service_get_all.assert_not_called() 

383 

384 @mock.patch.object(db, 'security_service_get_all', mock.Mock()) 

385 def test_security_services_list_all_tenants_with_invalid_value(self): 

386 req = fakes.HTTPRequest.blank( 

387 '/security-services?all_tenants=nerd', 

388 use_admin_context=True) 

389 self.assertRaises(exception.InvalidInput, self.controller.index, req) 

390 

391 @mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock()) 

392 def test_security_services_list_all_tenants_with_value_zero(self): 

393 db.security_service_get_all_by_project.return_value = [] 

394 req = fakes.HTTPRequest.blank( 

395 '/security-services?all_tenants=0', 

396 use_admin_context=True) 

397 res_dict = self.controller.index(req) 

398 self.assertEqual({'security_services': []}, res_dict) 

399 db.security_service_get_all_by_project.assert_called_once_with( 

400 req.environ['manila.context'], 

401 req.environ['manila.context'].project_id) 

402 

403 @mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock()) 

404 def test_security_services_list_admin_context_invalid_opts(self): 

405 db.security_service_get_all_by_project.return_value = [ 

406 self.ss_active_directory, 

407 self.ss_ldap, 

408 ] 

409 req = fakes.HTTPRequest.blank( 

410 '/security-services?fake_opt=fake_value', 

411 use_admin_context=True) 

412 res_dict = self.controller.index(req) 

413 self.assertEqual({'security_services': []}, res_dict) 

414 db.security_service_get_all_by_project.assert_called_once_with( 

415 req.environ['manila.context'], 

416 req.environ['manila.context'].project_id) 

417 

418 @mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock()) 

419 def test_security_service_list_all_filter_opts_separately(self): 

420 db.security_service_get_all_by_project.return_value = [ 

421 self.ss_active_directory, 

422 self.ss_ldap, 

423 ] 

424 for opt, val in self.valid_search_opts.items(): 

425 for use_admin_context in [True, False]: 

426 req = fakes.HTTPRequest.blank( 

427 '/security-services?' + opt + '=' + val, 

428 use_admin_context=use_admin_context) 

429 res_dict = self.controller.index(req) 

430 self.assertEqual(self.security_service_list_expected_resp, 

431 res_dict) 

432 db.security_service_get_all_by_project.assert_called_with( 

433 req.environ['manila.context'], 

434 req.environ['manila.context'].project_id) 

435 

436 @mock.patch.object(db, 'security_service_get_all_by_project', mock.Mock()) 

437 def test_security_service_list_all_filter_opts(self): 

438 db.security_service_get_all_by_project.return_value = [ 

439 self.ss_active_directory, 

440 self.ss_ldap, 

441 ] 

442 query_string = '/security-services?' + parse.urlencode(sorted( 

443 [(k, v) for (k, v) in list(self.valid_search_opts.items())])) 

444 for use_admin_context in [True, False]: 

445 req = fakes.HTTPRequest.blank(query_string, 

446 use_admin_context=use_admin_context) 

447 res_dict = self.controller.index(req) 

448 self.assertEqual(self.security_service_list_expected_resp, 

449 res_dict) 

450 db.security_service_get_all_by_project.assert_called_with( 

451 req.environ['manila.context'], 

452 req.environ['manila.context'].project_id)