Coverage for manila/context.py: 93%

68 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2011 OpenStack LLC. 

2# Copyright 2010 United States Government as represented by the 

3# Administrator of the National Aeronautics and Space Administration. 

4# All Rights Reserved. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); you may 

7# not use this file except in compliance with the License. You may obtain 

8# a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

14# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

15# License for the specific language governing permissions and limitations 

16# under the License. 

17 

18"""RequestContext: context for requests that persist through all of manila.""" 

19 

20import copy 

21 

22from oslo_context import context 

23from oslo_db.sqlalchemy import enginefacade 

24from oslo_utils import timeutils 

25 

26from manila.i18n import _ 

27from manila import policy 

28 

29 

30@enginefacade.transaction_context_provider 

31class RequestContext(context.RequestContext): 

32 """Security context and request information. 

33 

34 Represents the user taking a given action within the system. 

35 

36 """ 

37 

38 def __init__(self, user_id=None, project_id=None, is_admin=None, 

39 read_deleted="no", project_name=None, remote_address=None, 

40 timestamp=None, quota_class=None, service_catalog=None, 

41 **kwargs): 

42 """Initialize RequestContext. 

43 

44 :param read_deleted: 'no' indicates deleted records are hidden, 'yes' 

45 indicates deleted records are visible, 'only' indicates that 

46 *only* deleted records are visible. 

47 

48 :param kwargs: Extra arguments passed transparently to 

49 oslo_context.RequestContext. 

50 """ 

51 kwargs.setdefault('user_id', user_id) 

52 kwargs.setdefault('project_id', project_id) 

53 

54 super().__init__(is_admin=is_admin, **kwargs) 

55 

56 self.project_name = project_name 

57 if self.is_admin is None: 

58 self.is_admin = policy.check_is_admin(self) 

59 elif self.is_admin and 'admin' not in self.roles: 

60 self.roles.append('admin') 

61 # a "service" user's token will contain "service_roles" 

62 self.is_service = kwargs.get('service_roles') or False 

63 self.read_deleted = read_deleted 

64 self.remote_address = remote_address 

65 if not timestamp: 

66 timestamp = timeutils.utcnow() 

67 elif isinstance(timestamp, str): 67 ↛ 69line 67 didn't jump to line 69 because the condition on line 67 was always true

68 timestamp = timeutils.parse_isotime(timestamp) 

69 self.timestamp = timestamp 

70 self.quota_class = quota_class 

71 if service_catalog: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true

72 self.service_catalog = [s for s in service_catalog 

73 if s.get('type') in ('compute', 'volume')] 

74 else: 

75 self.service_catalog = [] 

76 

77 def _get_read_deleted(self): 

78 return self._read_deleted 

79 

80 def _set_read_deleted(self, read_deleted): 

81 if read_deleted not in ('no', 'yes', 'only'): 

82 raise ValueError(_("read_deleted can only be one of 'no', " 

83 "'yes' or 'only', not %r") % read_deleted) 

84 self._read_deleted = read_deleted 

85 

86 def _del_read_deleted(self): 

87 del self._read_deleted 

88 

89 read_deleted = property(_get_read_deleted, _set_read_deleted, 

90 _del_read_deleted) 

91 

92 def to_dict(self): 

93 values = super().to_dict() 

94 values['user_id'] = self.user_id 

95 values['project_id'] = self.project_id 

96 values['project_name'] = self.project_name 

97 values['domain_id'] = self.domain_id 

98 values['read_deleted'] = self.read_deleted 

99 values['remote_address'] = self.remote_address 

100 values['timestamp'] = self.timestamp.isoformat() 

101 values['quota_class'] = self.quota_class 

102 values['service_catalog'] = self.service_catalog 

103 values['request_id'] = self.request_id 

104 return values 

105 

106 @classmethod 

107 def from_dict(cls, values): 

108 return super().from_dict( 

109 values, 

110 user_id=values.get('user_id'), 

111 project_id=values.get('project_id'), 

112 project_name=values.get('project_name'), 

113 domain_id=values.get('domain_id'), 

114 read_deleted=values.get('read_deleted', 'no'), 

115 remote_address=values.get('remote_address'), 

116 timestamp=values.get('timestamp'), 

117 quota_class=values.get('quota_class'), 

118 service_catalog=values.get('service_catalog'), 

119 request_id=values.get('request_id'), 

120 is_admin=values.get('is_admin'), 

121 roles=values.get('roles'), 

122 auth_token=values.get('auth_token'), 

123 user_domain_id=values.get('user_domain_id'), 

124 project_domain_id=values.get('project_domain_id') 

125 ) 

126 

127 def elevated(self, read_deleted=None, overwrite=False): 

128 """Return a version of this context with admin flag set.""" 

129 ctx = copy.deepcopy(self) 

130 ctx.is_admin = True 

131 

132 if 'admin' not in ctx.roles: 

133 ctx.roles.append('admin') 

134 

135 if read_deleted is not None: 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 ctx.read_deleted = read_deleted 

137 

138 return ctx 

139 

140 def to_policy_values(self): 

141 policy = super(RequestContext, self).to_policy_values() 

142 policy['is_admin'] = self.is_admin 

143 return policy 

144 

145 

146def get_admin_context(read_deleted="no"): 

147 return RequestContext(user_id=None, 

148 project_id=None, 

149 is_admin=True, 

150 read_deleted=read_deleted, 

151 overwrite=False)