Coverage for manila/lock/api.py: 91%

93 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Licensed under the Apache License, Version 2.0 (the "License"); you may 

2# not use this file except in compliance with the License. You may obtain 

3# a copy of the License at 

4# 

5# http://www.apache.org/licenses/LICENSE-2.0 

6# 

7# Unless required by applicable law or agreed to in writing, software 

8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

10# License for the specific language governing permissions and limitations 

11# under the License. 

12""" 

13Handles all requests related to resource locks. 

14""" 

15 

16from oslo_log import log as logging 

17 

18from manila.common import constants 

19from manila.db import base 

20from manila import exception 

21from manila import policy 

22 

23LOG = logging.getLogger(__name__) 

24 

25 

26class API(base.Base): 

27 """API for handling resource locks.""" 

28 

29 resource_get = { 

30 "share": "share_get", 

31 "access_rule": "share_access_get_with_context" 

32 } 

33 resource_lock_disallowed_statuses = { 

34 "share": constants.DISALLOWED_STATUS_WHEN_LOCKING_SHARES, 

35 "access_rule": constants.DISALLOWED_STATUS_WHEN_LOCKING_ACCESS_RULES 

36 } 

37 

38 def _get_lock_context(self, context): 

39 if context.is_service: 

40 lock_context = 'service' 

41 elif context.is_admin: 

42 lock_context = 'admin' 

43 else: 

44 lock_context = 'user' 

45 return { 

46 'lock_context': lock_context, 

47 'user_id': context.user_id, 

48 'project_id': context.project_id, 

49 } 

50 

51 def _check_allow_lock_manipulation(self, context, resource_lock): 

52 """Lock owners may not manipulate a lock if lock_context disallows 

53 

54 The logic enforced by this method is that user created locks can be 

55 manipulated by all roles, service created locks can be manipulated 

56 by service and admin roles, while admin created locks can only be 

57 manipulated by admin role: 

58 

59 +------------+------------+--------------+---------+ 

60 | Requester | Lock Owner | Lock Context | Allowed | 

61 +------------+------------+--------------+---------+ 

62 | user | user | user | yes | 

63 | user | user | service | no | 

64 | user | admin | admin | no | 

65 | admin | user | user | yes | 

66 | admin | user | service | yes | 

67 | admin | admin | admin | yes | 

68 | service | user | user | yes | 

69 | service | user | service | yes | 

70 | service | admin | admin | no | 

71 +------------+------------+--------------+---------+ 

72 """ 

73 locked_by = resource_lock['lock_context'] 

74 update_requested_by = self._get_lock_context(context)['lock_context'] 

75 if ((locked_by == 'admin' and update_requested_by != 'admin') 

76 or (locked_by == 'service' and update_requested_by == 'user')): 

77 raise exception.NotAuthorized("Resource lock cannot be " 

78 "manipulated by user. Please " 

79 "contact the administrator.") 

80 

81 def access_is_restricted(self, context, resource_lock): 

82 """Ensure the requester doesn't have visibility restrictions 

83 

84 Call the check allow lock manipulation method as a first validation. 

85 In case it fails, the requester should not have the access rules 

86 fields entirely visible. In case it passes and the access visibility 

87 is restricted, the users will have visibility of all fields only if 

88 they have originally created the lock. 

89 """ 

90 try: 

91 self._check_allow_lock_manipulation(context, resource_lock) 

92 except exception.NotAuthorized: 

93 return True 

94 

95 try: 

96 policy.check_policy( 

97 context, 'resource_lock', 'bypass_locked_show_action', 

98 resource_lock) 

99 except exception.NotAuthorized: 

100 return True 

101 

102 return False 

103 

104 def get(self, context, lock_id): 

105 """Return resource lock with the specified id.""" 

106 return self.db.resource_lock_get(context, lock_id) 

107 

108 def get_all(self, context, search_opts=None, limit=None, 

109 offset=None, sort_key="created_at", sort_dir="desc", 

110 show_count=False): 

111 """Return resource locks for the given context.""" 

112 LOG.debug("Searching for locks by: %s", search_opts) 

113 

114 search_opts = search_opts or {} 

115 if 'all_projects' in search_opts: 115 ↛ 128line 115 didn't jump to line 128 because the condition on line 115 was always true

116 allow_all_projects = policy.check_policy( 

117 context, 

118 'resource_lock', 

119 'get_all_projects', 

120 do_raise=False 

121 ) 

122 if not allow_all_projects: 

123 LOG.warning("User %s not allowed to query locks " 

124 "across all projects.", context.user_id) 

125 search_opts.pop('all_projects') 

126 search_opts.pop('project_id', None) 

127 

128 locks, count = self.db.resource_lock_get_all( 

129 context, 

130 filters=search_opts, 

131 limit=limit, offset=offset, 

132 sort_key=sort_key, 

133 sort_dir=sort_dir, 

134 show_count=show_count, 

135 ) 

136 

137 return locks, count 

138 

139 def create(self, context, resource_id=None, resource_type=None, 

140 resource_action=None, lock_reason=None, resource=None): 

141 """Create a resource lock with the specified information.""" 

142 get_res_method = getattr(self.db, self.resource_get[resource_type]) 

143 if resource_action == constants.RESOURCE_ACTION_SHOW: 

144 # We can't allow visibility locks to be placed more than once, 

145 # otherwise the resource might become visible to someone else. 

146 visibility_locks, __ = self.db.resource_lock_get_all( 

147 context.elevated(), 

148 filters={'resource_id': resource_id, 

149 'resource_action': resource_action, 

150 'all_projects': True}) 

151 if visibility_locks: 

152 raise exception.ResourceVisibilityLockExists( 

153 resource_id=resource_id) 

154 if resource is None: 154 ↛ 156line 154 didn't jump to line 156 because the condition on line 154 was always true

155 resource = get_res_method(context, resource_id) 

156 policy.check_policy(context, 'resource_lock', 'create', resource) 

157 self._check_resource_state_for_locking( 

158 resource_action, resource, resource_type=resource_type) 

159 lock_context_data = self._get_lock_context(context) 

160 resource_lock = lock_context_data.copy() 

161 resource_lock.update({ 

162 'resource_id': resource_id, 

163 'resource_action': resource_action, 

164 'lock_reason': lock_reason, 

165 'resource_type': resource_type 

166 }) 

167 return self.db.resource_lock_create(context, resource_lock) 

168 

169 def _check_resource_state_for_locking(self, resource_action, resource, 

170 resource_type='share'): 

171 """Check if resource is in a "disallowed" state for locking. 

172 

173 For example, deletion lock on a "deleting" resource would be futile. 

174 """ 

175 resource_state = resource.get('status', resource.get('state', '')) 

176 disallowed_statuses = () 

177 if resource_action == 'delete': 

178 disallowed_statuses = ( 

179 self.resource_lock_disallowed_statuses[resource_type]) 

180 if resource_state in disallowed_statuses: 

181 msg = "Resource status not suitable for locking" 

182 raise exception.InvalidInput(reason=msg) 

183 if resource_type == constants.SHARE_RESOURCE_TYPE: 

184 resource_is_soft_deleted = resource.get('is_soft_deleted', False) 

185 if resource_is_soft_deleted: 

186 msg = ( 

187 "Resource cannot be locked since it has been soft deleted." 

188 ) 

189 raise exception.InvalidInput(reason=msg) 

190 

191 def update(self, context, resource_lock, updates): 

192 """Update a resource lock with the specified information.""" 

193 lock_id = resource_lock['id'] 

194 policy.check_policy(context, 'resource_lock', 'update', resource_lock) 

195 self._check_allow_lock_manipulation(context, resource_lock) 

196 if 'resource_action' in updates: 

197 # A resource can have only one visibility lock 

198 if (updates['resource_action'] == constants.RESOURCE_ACTION_SHOW 198 ↛ 201line 198 didn't jump to line 201 because the condition on line 198 was never true

199 and resource_lock['resource_action'] != 

200 constants.RESOURCE_ACTION_SHOW): 

201 filters = { 

202 "resource_id": resource_lock['resource_id'], 

203 "resource_action": constants.RESOURCE_ACTION_SHOW 

204 } 

205 visibility_locks = self.get_all( 

206 context.elevated(), search_opts=filters) 

207 if visibility_locks: 

208 msg = "The resource already has a visibility lock." 

209 raise exception.InvalidInput(reason=msg) 

210 get_res_method = getattr( 

211 self.db, 

212 self.resource_get[resource_lock['resource_type']], 

213 ) 

214 resource = get_res_method(context, resource_lock['resource_id']) 

215 self._check_resource_state_for_locking( 

216 updates['resource_action'], resource) 

217 return self.db.resource_lock_update(context, lock_id, updates) 

218 

219 def ensure_context_can_delete_lock(self, context, lock_id): 

220 """Ensure the requester is able to delete locks.""" 

221 resource_lock = self.db.resource_lock_get(context, lock_id) 

222 policy.check_policy(context, 'resource_lock', 'delete', resource_lock) 

223 self._check_allow_lock_manipulation(context, resource_lock) 

224 

225 def delete(self, context, lock_id): 

226 """Delete resource lock with the specified id.""" 

227 self.ensure_context_can_delete_lock(context, lock_id) 

228 self.db.resource_lock_delete(context, lock_id)