Coverage for manila/lock/api.py: 91%
93 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
12"""
13Handles all requests related to resource locks.
14"""
16from oslo_log import log as logging
18from manila.common import constants
19from manila.db import base
20from manila import exception
21from manila import policy
23LOG = logging.getLogger(__name__)
26class API(base.Base):
27 """API for handling resource locks."""
29 resource_get = {
30 "share": "share_get",
31 "access_rule": "share_access_get_with_context"
32 }
33 resource_lock_disallowed_statuses = {
34 "share": constants.DISALLOWED_STATUS_WHEN_LOCKING_SHARES,
35 "access_rule": constants.DISALLOWED_STATUS_WHEN_LOCKING_ACCESS_RULES
36 }
38 def _get_lock_context(self, context):
39 if context.is_service:
40 lock_context = 'service'
41 elif context.is_admin:
42 lock_context = 'admin'
43 else:
44 lock_context = 'user'
45 return {
46 'lock_context': lock_context,
47 'user_id': context.user_id,
48 'project_id': context.project_id,
49 }
51 def _check_allow_lock_manipulation(self, context, resource_lock):
52 """Lock owners may not manipulate a lock if lock_context disallows
54 The logic enforced by this method is that user created locks can be
55 manipulated by all roles, service created locks can be manipulated
56 by service and admin roles, while admin created locks can only be
57 manipulated by admin role:
59 +------------+------------+--------------+---------+
60 | Requester | Lock Owner | Lock Context | Allowed |
61 +------------+------------+--------------+---------+
62 | user | user | user | yes |
63 | user | user | service | no |
64 | user | admin | admin | no |
65 | admin | user | user | yes |
66 | admin | user | service | yes |
67 | admin | admin | admin | yes |
68 | service | user | user | yes |
69 | service | user | service | yes |
70 | service | admin | admin | no |
71 +------------+------------+--------------+---------+
72 """
73 locked_by = resource_lock['lock_context']
74 update_requested_by = self._get_lock_context(context)['lock_context']
75 if ((locked_by == 'admin' and update_requested_by != 'admin')
76 or (locked_by == 'service' and update_requested_by == 'user')):
77 raise exception.NotAuthorized("Resource lock cannot be "
78 "manipulated by user. Please "
79 "contact the administrator.")
81 def access_is_restricted(self, context, resource_lock):
82 """Ensure the requester doesn't have visibility restrictions
84 Call the check allow lock manipulation method as a first validation.
85 In case it fails, the requester should not have the access rules
86 fields entirely visible. In case it passes and the access visibility
87 is restricted, the users will have visibility of all fields only if
88 they have originally created the lock.
89 """
90 try:
91 self._check_allow_lock_manipulation(context, resource_lock)
92 except exception.NotAuthorized:
93 return True
95 try:
96 policy.check_policy(
97 context, 'resource_lock', 'bypass_locked_show_action',
98 resource_lock)
99 except exception.NotAuthorized:
100 return True
102 return False
104 def get(self, context, lock_id):
105 """Return resource lock with the specified id."""
106 return self.db.resource_lock_get(context, lock_id)
108 def get_all(self, context, search_opts=None, limit=None,
109 offset=None, sort_key="created_at", sort_dir="desc",
110 show_count=False):
111 """Return resource locks for the given context."""
112 LOG.debug("Searching for locks by: %s", search_opts)
114 search_opts = search_opts or {}
115 if 'all_projects' in search_opts: 115 ↛ 128line 115 didn't jump to line 128 because the condition on line 115 was always true
116 allow_all_projects = policy.check_policy(
117 context,
118 'resource_lock',
119 'get_all_projects',
120 do_raise=False
121 )
122 if not allow_all_projects:
123 LOG.warning("User %s not allowed to query locks "
124 "across all projects.", context.user_id)
125 search_opts.pop('all_projects')
126 search_opts.pop('project_id', None)
128 locks, count = self.db.resource_lock_get_all(
129 context,
130 filters=search_opts,
131 limit=limit, offset=offset,
132 sort_key=sort_key,
133 sort_dir=sort_dir,
134 show_count=show_count,
135 )
137 return locks, count
139 def create(self, context, resource_id=None, resource_type=None,
140 resource_action=None, lock_reason=None, resource=None):
141 """Create a resource lock with the specified information."""
142 get_res_method = getattr(self.db, self.resource_get[resource_type])
143 if resource_action == constants.RESOURCE_ACTION_SHOW:
144 # We can't allow visibility locks to be placed more than once,
145 # otherwise the resource might become visible to someone else.
146 visibility_locks, __ = self.db.resource_lock_get_all(
147 context.elevated(),
148 filters={'resource_id': resource_id,
149 'resource_action': resource_action,
150 'all_projects': True})
151 if visibility_locks:
152 raise exception.ResourceVisibilityLockExists(
153 resource_id=resource_id)
154 if resource is None: 154 ↛ 156line 154 didn't jump to line 156 because the condition on line 154 was always true
155 resource = get_res_method(context, resource_id)
156 policy.check_policy(context, 'resource_lock', 'create', resource)
157 self._check_resource_state_for_locking(
158 resource_action, resource, resource_type=resource_type)
159 lock_context_data = self._get_lock_context(context)
160 resource_lock = lock_context_data.copy()
161 resource_lock.update({
162 'resource_id': resource_id,
163 'resource_action': resource_action,
164 'lock_reason': lock_reason,
165 'resource_type': resource_type
166 })
167 return self.db.resource_lock_create(context, resource_lock)
169 def _check_resource_state_for_locking(self, resource_action, resource,
170 resource_type='share'):
171 """Check if resource is in a "disallowed" state for locking.
173 For example, deletion lock on a "deleting" resource would be futile.
174 """
175 resource_state = resource.get('status', resource.get('state', ''))
176 disallowed_statuses = ()
177 if resource_action == 'delete':
178 disallowed_statuses = (
179 self.resource_lock_disallowed_statuses[resource_type])
180 if resource_state in disallowed_statuses:
181 msg = "Resource status not suitable for locking"
182 raise exception.InvalidInput(reason=msg)
183 if resource_type == constants.SHARE_RESOURCE_TYPE:
184 resource_is_soft_deleted = resource.get('is_soft_deleted', False)
185 if resource_is_soft_deleted:
186 msg = (
187 "Resource cannot be locked since it has been soft deleted."
188 )
189 raise exception.InvalidInput(reason=msg)
191 def update(self, context, resource_lock, updates):
192 """Update a resource lock with the specified information."""
193 lock_id = resource_lock['id']
194 policy.check_policy(context, 'resource_lock', 'update', resource_lock)
195 self._check_allow_lock_manipulation(context, resource_lock)
196 if 'resource_action' in updates:
197 # A resource can have only one visibility lock
198 if (updates['resource_action'] == constants.RESOURCE_ACTION_SHOW 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was never true
199 and resource_lock['resource_action'] !=
200 constants.RESOURCE_ACTION_SHOW):
201 filters = {
202 "resource_id": resource_lock['resource_id'],
203 "resource_action": constants.RESOURCE_ACTION_SHOW
204 }
205 visibility_locks = self.get_all(
206 context.elevated(), search_opts=filters)
207 if visibility_locks:
208 msg = "The resource already has a visibility lock."
209 raise exception.InvalidInput(reason=msg)
210 get_res_method = getattr(
211 self.db,
212 self.resource_get[resource_lock['resource_type']],
213 )
214 resource = get_res_method(context, resource_lock['resource_id'])
215 self._check_resource_state_for_locking(
216 updates['resource_action'], resource)
217 return self.db.resource_lock_update(context, lock_id, updates)
219 def ensure_context_can_delete_lock(self, context, lock_id):
220 """Ensure the requester is able to delete locks."""
221 resource_lock = self.db.resource_lock_get(context, lock_id)
222 policy.check_policy(context, 'resource_lock', 'delete', resource_lock)
223 self._check_allow_lock_manipulation(context, resource_lock)
225 def delete(self, context, lock_id):
226 """Delete resource lock with the specified id."""
227 self.ensure_context_can_delete_lock(context, lock_id)
228 self.db.resource_lock_delete(context, lock_id)