Coverage for manila/api/v2/security_service.py: 89%
138 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2014 Mirantis Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""The security service api."""
18from http import client as http_client
20from oslo_log import log
21import webob
22from webob import exc
24from manila.api import common
25from manila.api.openstack import api_version_request as api_version
26from manila.api.openstack import wsgi
27from manila.api.views import security_service as security_service_views
28from manila.common import constants
29from manila import db
30from manila import exception
31from manila.i18n import _
32from manila import policy
33from manila import utils
36RESOURCE_NAME = 'security_service'
37LOG = log.getLogger(__name__)
40class SecurityServiceController(wsgi.Controller):
41 """The Shares API controller for the OpenStack API."""
43 _view_builder_class = security_service_views.ViewBuilder
45 def show(self, req, id):
46 """Return data about the given security service."""
47 context = req.environ['manila.context']
48 try:
49 security_service = db.security_service_get(context, id)
50 policy.check_policy(context, RESOURCE_NAME, 'show',
51 security_service)
52 except exception.NotFound:
53 raise exc.HTTPNotFound()
55 return self._view_builder.detail(req, security_service)
57 def delete(self, req, id):
58 """Delete a security service."""
59 context = req.environ['manila.context']
61 LOG.info("Delete security service with id: %s",
62 id, context=context)
64 try:
65 security_service = db.security_service_get(context, id)
66 except exception.NotFound:
67 raise exc.HTTPNotFound()
69 share_nets = db.share_network_get_all_by_security_service(
70 context, id)
71 if share_nets:
72 msg = _("Cannot delete security service. It is "
73 "assigned to share network(s)")
74 raise exc.HTTPForbidden(explanation=msg)
75 policy.check_policy(context, RESOURCE_NAME,
76 'delete', security_service)
77 db.security_service_delete(context, id)
79 return webob.Response(status_int=http_client.ACCEPTED)
81 def index(self, req):
82 """Returns a summary list of security services."""
83 policy.check_policy(req.environ['manila.context'], RESOURCE_NAME,
84 'index')
85 return self._get_security_services(req, is_detail=False)
87 def detail(self, req):
88 """Returns a detailed list of security services."""
89 policy.check_policy(req.environ['manila.context'], RESOURCE_NAME,
90 'detail')
91 return self._get_security_services(req, is_detail=True)
93 def _get_security_services(self, req, is_detail):
94 """Returns a transformed list of security services.
96 The list gets transformed through view builder.
97 """
98 context = req.environ['manila.context']
100 search_opts = {}
101 search_opts.update(req.GET)
103 # NOTE(vponomaryov): remove 'status' from search opts
104 # since it was removed from security service model.
105 search_opts.pop('status', None)
106 if 'share_network_id' in search_opts:
107 share_nw = db.share_network_get(context,
108 search_opts['share_network_id'])
109 security_services = share_nw['security_services']
110 del search_opts['share_network_id']
111 else:
112 # ignore all_tenants if not authorized to use it.
113 security_services = None
114 if utils.is_all_tenants(search_opts):
115 allowed_to_list_all_tenants = policy.check_policy(
116 context, RESOURCE_NAME, 'get_all_security_services',
117 do_raise=False)
118 if allowed_to_list_all_tenants: 118 ↛ 120line 118 didn't jump to line 120 because the condition on line 118 was always true
119 security_services = db.security_service_get_all(context)
120 if security_services is None:
121 security_services = db.security_service_get_all_by_project(
122 context, context.project_id)
123 search_opts.pop('all_tenants', None)
124 common.remove_invalid_options(
125 context,
126 search_opts,
127 self._get_security_services_search_options())
128 if search_opts:
129 results = []
130 not_found = object()
131 for ss in security_services:
132 if all(ss.get(opt, not_found) == value for opt, value in
133 search_opts.items()):
134 results.append(ss)
135 security_services = results
137 limited_list = common.limited(security_services, req)
139 if is_detail:
140 security_services = self._view_builder.detail_list(
141 req, limited_list)
142 for ss in security_services['security_services']:
143 share_networks = db.share_network_get_all_by_security_service(
144 context,
145 ss['id'])
146 ss['share_networks'] = [sn['id'] for sn in share_networks]
147 else:
148 security_services = self._view_builder.summary_list(
149 req, limited_list)
150 return security_services
152 def _get_security_services_search_options(self):
153 return ('name', 'id', 'type', 'user',
154 'server', 'dns_ip', 'domain', )
156 def _share_servers_dependent_on_sn_exist(self, context,
157 security_service_id):
158 share_networks = db.share_network_get_all_by_security_service(
159 context, security_service_id)
161 for sn in share_networks:
162 for sns in sn['share_network_subnets']:
163 if 'share_servers' in sns and sns['share_servers']:
164 return True
165 return False
167 def update(self, req, id, body):
168 """Update a security service."""
169 context = req.environ['manila.context']
171 if not body or 'security_service' not in body: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true
172 raise exc.HTTPUnprocessableEntity()
174 security_service_data = body['security_service']
175 valid_update_keys = (
176 'description',
177 'name'
178 )
180 try:
181 security_service = db.security_service_get(context, id)
182 policy.check_policy(context, RESOURCE_NAME, 'update',
183 security_service)
184 except exception.NotFound:
185 raise exc.HTTPNotFound()
187 if self._share_servers_dependent_on_sn_exist(context, id): 187 ↛ 196line 187 didn't jump to line 196 because the condition on line 187 was always true
188 for item in security_service_data:
189 if item not in valid_update_keys:
190 msg = _("Cannot update security service %s. It is "
191 "attached to share network with share server "
192 "associated. Only 'name' and 'description' "
193 "fields are available for update.") % id
194 raise exc.HTTPForbidden(explanation=msg)
196 server = security_service_data.get('server')
197 default_ad_site = security_service_data.get('default_ad_site')
198 if default_ad_site: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 if req.api_version_request < api_version.APIVersionRequest("2.76"):
200 msg = _('"default_ad_site" is only supported from API '
201 'version 2.76.')
202 raise webob.exc.HTTPBadRequest(explanation=msg)
204 if (security_service['type'] == 'active_directory' and server and
205 default_ad_site):
206 raise exception.InvalidInput(
207 reason=(_("Cannot create security service because both "
208 "server and 'default_ad_site' were provided. "
209 "Specify either server or 'default_ad_site'.")))
211 policy.check_policy(context, RESOURCE_NAME, 'update', security_service)
212 security_service = db.security_service_update(
213 context, id, security_service_data)
214 return self._view_builder.detail(req, security_service)
216 def create(self, req, body):
217 """Creates a new security service."""
218 context = req.environ['manila.context']
219 policy.check_policy(context, RESOURCE_NAME, 'create')
221 if not self.is_valid_body(body, 'security_service'):
222 raise exc.HTTPUnprocessableEntity()
224 security_service_args = body['security_service']
225 security_srv_type = security_service_args.get('type')
226 allowed_types = constants.SECURITY_SERVICES_ALLOWED_TYPES
227 if security_srv_type not in allowed_types:
228 raise exception.InvalidInput(
229 reason=(_("Invalid type %(type)s specified for security "
230 "service. Valid types are %(types)s") %
231 {'type': security_srv_type,
232 'types': ','.join(allowed_types)}))
233 server = security_service_args.get('server')
234 default_ad_site = security_service_args.get('default_ad_site')
235 if default_ad_site:
236 if req.api_version_request < api_version.APIVersionRequest("2.76"): 236 ↛ 237line 236 didn't jump to line 237 because the condition on line 236 was never true
237 msg = _('"default_ad_site" is only supported from API '
238 'version 2.76.')
239 raise webob.exc.HTTPBadRequest(explanation=msg)
241 if (security_srv_type == 'active_directory' and server and 241 ↛ 247line 241 didn't jump to line 247 because the condition on line 241 was always true
242 default_ad_site):
243 raise exception.InvalidInput(
244 reason=(_("Cannot create security service because both "
245 "server and 'default_ad_site' were provided, "
246 "Specify either server or 'default_ad_site'.")))
247 security_service_args['project_id'] = context.project_id
248 security_service = db.security_service_create(
249 context, security_service_args)
251 return self._view_builder.detail(req, security_service)
254def create_resource():
255 return wsgi.Resource(SecurityServiceController())