Coverage for manila/api/v2/security_service.py: 89%

138 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2014 Mirantis Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16"""The security service api.""" 

17 

18from http import client as http_client 

19 

20from oslo_log import log 

21import webob 

22from webob import exc 

23 

24from manila.api import common 

25from manila.api.openstack import api_version_request as api_version 

26from manila.api.openstack import wsgi 

27from manila.api.views import security_service as security_service_views 

28from manila.common import constants 

29from manila import db 

30from manila import exception 

31from manila.i18n import _ 

32from manila import policy 

33from manila import utils 

34 

35 

36RESOURCE_NAME = 'security_service' 

37LOG = log.getLogger(__name__) 

38 

39 

40class SecurityServiceController(wsgi.Controller): 

41 """The Shares API controller for the OpenStack API.""" 

42 

43 _view_builder_class = security_service_views.ViewBuilder 

44 

45 def show(self, req, id): 

46 """Return data about the given security service.""" 

47 context = req.environ['manila.context'] 

48 try: 

49 security_service = db.security_service_get(context, id) 

50 policy.check_policy(context, RESOURCE_NAME, 'show', 

51 security_service) 

52 except exception.NotFound: 

53 raise exc.HTTPNotFound() 

54 

55 return self._view_builder.detail(req, security_service) 

56 

57 def delete(self, req, id): 

58 """Delete a security service.""" 

59 context = req.environ['manila.context'] 

60 

61 LOG.info("Delete security service with id: %s", 

62 id, context=context) 

63 

64 try: 

65 security_service = db.security_service_get(context, id) 

66 except exception.NotFound: 

67 raise exc.HTTPNotFound() 

68 

69 share_nets = db.share_network_get_all_by_security_service( 

70 context, id) 

71 if share_nets: 

72 msg = _("Cannot delete security service. It is " 

73 "assigned to share network(s)") 

74 raise exc.HTTPForbidden(explanation=msg) 

75 policy.check_policy(context, RESOURCE_NAME, 

76 'delete', security_service) 

77 db.security_service_delete(context, id) 

78 

79 return webob.Response(status_int=http_client.ACCEPTED) 

80 

81 def index(self, req): 

82 """Returns a summary list of security services.""" 

83 policy.check_policy(req.environ['manila.context'], RESOURCE_NAME, 

84 'index') 

85 return self._get_security_services(req, is_detail=False) 

86 

87 def detail(self, req): 

88 """Returns a detailed list of security services.""" 

89 policy.check_policy(req.environ['manila.context'], RESOURCE_NAME, 

90 'detail') 

91 return self._get_security_services(req, is_detail=True) 

92 

93 def _get_security_services(self, req, is_detail): 

94 """Returns a transformed list of security services. 

95 

96 The list gets transformed through view builder. 

97 """ 

98 context = req.environ['manila.context'] 

99 

100 search_opts = {} 

101 search_opts.update(req.GET) 

102 

103 # NOTE(vponomaryov): remove 'status' from search opts 

104 # since it was removed from security service model. 

105 search_opts.pop('status', None) 

106 if 'share_network_id' in search_opts: 

107 share_nw = db.share_network_get(context, 

108 search_opts['share_network_id']) 

109 security_services = share_nw['security_services'] 

110 del search_opts['share_network_id'] 

111 else: 

112 # ignore all_tenants if not authorized to use it. 

113 security_services = None 

114 if utils.is_all_tenants(search_opts): 

115 allowed_to_list_all_tenants = policy.check_policy( 

116 context, RESOURCE_NAME, 'get_all_security_services', 

117 do_raise=False) 

118 if allowed_to_list_all_tenants: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true

119 security_services = db.security_service_get_all(context) 

120 if security_services is None: 

121 security_services = db.security_service_get_all_by_project( 

122 context, context.project_id) 

123 search_opts.pop('all_tenants', None) 

124 common.remove_invalid_options( 

125 context, 

126 search_opts, 

127 self._get_security_services_search_options()) 

128 if search_opts: 

129 results = [] 

130 not_found = object() 

131 for ss in security_services: 

132 if all(ss.get(opt, not_found) == value for opt, value in 

133 search_opts.items()): 

134 results.append(ss) 

135 security_services = results 

136 

137 limited_list = common.limited(security_services, req) 

138 

139 if is_detail: 

140 security_services = self._view_builder.detail_list( 

141 req, limited_list) 

142 for ss in security_services['security_services']: 

143 share_networks = db.share_network_get_all_by_security_service( 

144 context, 

145 ss['id']) 

146 ss['share_networks'] = [sn['id'] for sn in share_networks] 

147 else: 

148 security_services = self._view_builder.summary_list( 

149 req, limited_list) 

150 return security_services 

151 

152 def _get_security_services_search_options(self): 

153 return ('name', 'id', 'type', 'user', 

154 'server', 'dns_ip', 'domain', ) 

155 

156 def _share_servers_dependent_on_sn_exist(self, context, 

157 security_service_id): 

158 share_networks = db.share_network_get_all_by_security_service( 

159 context, security_service_id) 

160 

161 for sn in share_networks: 

162 for sns in sn['share_network_subnets']: 

163 if 'share_servers' in sns and sns['share_servers']: 

164 return True 

165 return False 

166 

167 def update(self, req, id, body): 

168 """Update a security service.""" 

169 context = req.environ['manila.context'] 

170 

171 if not body or 'security_service' not in body: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true

172 raise exc.HTTPUnprocessableEntity() 

173 

174 security_service_data = body['security_service'] 

175 valid_update_keys = ( 

176 'description', 

177 'name' 

178 ) 

179 

180 try: 

181 security_service = db.security_service_get(context, id) 

182 policy.check_policy(context, RESOURCE_NAME, 'update', 

183 security_service) 

184 except exception.NotFound: 

185 raise exc.HTTPNotFound() 

186 

187 if self._share_servers_dependent_on_sn_exist(context, id): 187 ↛ 196line 187 didn't jump to line 196 because the condition on line 187 was always true

188 for item in security_service_data: 

189 if item not in valid_update_keys: 

190 msg = _("Cannot update security service %s. It is " 

191 "attached to share network with share server " 

192 "associated. Only 'name' and 'description' " 

193 "fields are available for update.") % id 

194 raise exc.HTTPForbidden(explanation=msg) 

195 

196 server = security_service_data.get('server') 

197 default_ad_site = security_service_data.get('default_ad_site') 

198 if default_ad_site: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 if req.api_version_request < api_version.APIVersionRequest("2.76"): 

200 msg = _('"default_ad_site" is only supported from API ' 

201 'version 2.76.') 

202 raise webob.exc.HTTPBadRequest(explanation=msg) 

203 

204 if (security_service['type'] == 'active_directory' and server and 

205 default_ad_site): 

206 raise exception.InvalidInput( 

207 reason=(_("Cannot create security service because both " 

208 "server and 'default_ad_site' were provided. " 

209 "Specify either server or 'default_ad_site'."))) 

210 

211 policy.check_policy(context, RESOURCE_NAME, 'update', security_service) 

212 security_service = db.security_service_update( 

213 context, id, security_service_data) 

214 return self._view_builder.detail(req, security_service) 

215 

216 def create(self, req, body): 

217 """Creates a new security service.""" 

218 context = req.environ['manila.context'] 

219 policy.check_policy(context, RESOURCE_NAME, 'create') 

220 

221 if not self.is_valid_body(body, 'security_service'): 

222 raise exc.HTTPUnprocessableEntity() 

223 

224 security_service_args = body['security_service'] 

225 security_srv_type = security_service_args.get('type') 

226 allowed_types = constants.SECURITY_SERVICES_ALLOWED_TYPES 

227 if security_srv_type not in allowed_types: 

228 raise exception.InvalidInput( 

229 reason=(_("Invalid type %(type)s specified for security " 

230 "service. Valid types are %(types)s") % 

231 {'type': security_srv_type, 

232 'types': ','.join(allowed_types)})) 

233 server = security_service_args.get('server') 

234 default_ad_site = security_service_args.get('default_ad_site') 

235 if default_ad_site: 

236 if req.api_version_request < api_version.APIVersionRequest("2.76"): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true

237 msg = _('"default_ad_site" is only supported from API ' 

238 'version 2.76.') 

239 raise webob.exc.HTTPBadRequest(explanation=msg) 

240 

241 if (security_srv_type == 'active_directory' and server and 241 ↛ 247line 241 didn't jump to line 247 because the condition on line 241 was always true

242 default_ad_site): 

243 raise exception.InvalidInput( 

244 reason=(_("Cannot create security service because both " 

245 "server and 'default_ad_site' were provided, " 

246 "Specify either server or 'default_ad_site'."))) 

247 security_service_args['project_id'] = context.project_id 

248 security_service = db.security_service_create( 

249 context, security_service_args) 

250 

251 return self._view_builder.detail(req, security_service) 

252 

253 

254def create_resource(): 

255 return wsgi.Resource(SecurityServiceController())