Coverage for manila/keymgr/barbican.py: 59%

123 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2025 Cloudification GmbH. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import itertools 

17 

18from castellan.key_manager import barbican_key_manager 

19from castellan import options as castellan_options 

20from keystoneauth1 import loading as ks_loading 

21from keystoneauth1 import session as ks_session 

22from keystoneclient.v3 import client as ks_client 

23from oslo_config import cfg 

24from oslo_log import log as logging 

25from oslo_utils import uuidutils 

26 

27from manila.common import client_auth 

28from manila import exception 

29 

30 

31BARBICAN_GROUP = 'barbican' 

32 

33CONF = cfg.CONF 

34LOG = logging.getLogger(__name__) 

35 

36castellan_options.set_defaults(CONF) 

37ks_loading.register_auth_conf_options(CONF, BARBICAN_GROUP) 

38 

39 

40BARBICAN_OPTS = [ 

41 cfg.StrOpt('endpoint_type', 

42 default='publicURL', 

43 choices=['publicURL', 'internalURL', 'adminURL', 

44 'public', 'internal', 'admin'], 

45 help='Endpoint type to be used with keystone client calls.'), 

46 cfg.StrOpt('region_name', 

47 help='Region name for connecting to keystone for ' 

48 'application credential management.'), 

49] 

50 

51CONF.register_opts(BARBICAN_OPTS, BARBICAN_GROUP) 

52 

53 

54def list_opts(): 

55 # NOTE(tkajinam): This likely breaks when castellan fixes missing auth 

56 # plugin options 

57 return itertools.chain( 

58 [(BARBICAN_GROUP, BARBICAN_OPTS)], 

59 client_auth.AuthClientLoader.list_opts(BARBICAN_GROUP) 

60 ) 

61 

62 

63def _require_barbican_key_manager_backend(conf): 

64 backend = conf.key_manager.backend 

65 if backend is None: 65 ↛ 66line 65 didn't jump to line 66 because the condition on line 65 was never true

66 LOG.warning("The BarbicanKeyManager backend should be explicitly " 

67 "used for share encryption.") 

68 raise exception.ManilaBarbicanACLError() 

69 

70 backend = backend.split('.')[-1] 

71 if backend not in ('barbican', 'BarbicanKeyManager'): 

72 LOG.warning("The '%s' key_manager backend is not supported. Please" 

73 " use barbican as key_manager.", backend) 

74 raise exception.ManilaBarbicanACLError() 

75 

76 

77class BarbicanSecretACL(barbican_key_manager.BarbicanKeyManager): 

78 

79 def get_client_and_href(self, context, secret_ref): 

80 """Get user barbican client and a secret href""" 

81 _require_barbican_key_manager_backend(self.conf) 

82 

83 if not secret_ref: 

84 LOG.error("Missing secret_ref provided in current user context.") 

85 raise exception.ManilaBarbicanACLError() 

86 

87 # Establish a Barbican client session of current user and keystone 

88 # session of barbican user to get its user_id. Grant ACL to barbican 

89 # user that it will be used for the key_ref handover process. 

90 try: 

91 user_barbican_client, base_url = self._get_barbican_client(context) 

92 secret_ref = self._create_secret_ref(base_url, secret_ref) 

93 except Exception as e: 

94 LOG.error("Failed to create barbican client. Error: %s", e) 

95 raise exception.ManilaBarbicanACLError() 

96 

97 return user_barbican_client, secret_ref 

98 

99 def _get_barbican_user_id(self): 

100 barbican_auth = ks_loading.load_auth_from_conf_options( 

101 self.conf, BARBICAN_GROUP) 

102 barbican_sess = ks_session.Session(auth=barbican_auth) 

103 barbican_ks_client = ks_client.Client( 

104 session=barbican_sess, 

105 interface=self.conf.barbican.endpoint_type, 

106 region_name=self.conf.barbican.region_name) 

107 return barbican_ks_client.session.get_user_id() 

108 

109 def create_secret_access(self, context, secret_ref): 

110 try: 

111 user_barbican_client, secret_href = self.get_client_and_href( 

112 context, secret_ref) 

113 barbican_user_id = self._get_barbican_user_id() 

114 # Create a Barbican ACL so the barbican user can access it. 

115 acl = user_barbican_client.acls.create(entity_ref=secret_href, 

116 users=[barbican_user_id], 

117 project_access=False) 

118 acl.submit() 

119 except Exception as e: 

120 LOG.error("Failed to create secret ACL. Error: %s", e) 

121 raise exception.ManilaBarbicanACLError() 

122 

123 def delete_secret_access(self, context, secret_ref): 

124 try: 

125 user_barbican_client, secret_href = self.get_client_and_href( 

126 context, secret_ref) 

127 barbican_user_id = self._get_barbican_user_id() 

128 

129 # Remove a Barbican ACL for the barbican user. 

130 acl_entity = user_barbican_client.acls.get(entity_ref=secret_href) 

131 existing_users = acl_entity.read.users 

132 remove_users = [barbican_user_id] 

133 updated_users = set(existing_users).difference(remove_users) 

134 acl_entity.read.users = list(updated_users) 

135 

136 acl_entity.submit() 

137 except Exception as e: 

138 LOG.error("Failed to delete secret ACL. Error: %s", e) 

139 

140 def get_secret_href(self, context, secret_ref): 

141 try: 

142 user_barbican_client, secret_href = self.get_client_and_href( 

143 context, secret_ref) 

144 return secret_href 

145 except Exception as e: 

146 LOG.error("Failed to get barbican secret href. Error: %s", e) 

147 raise exception.ManilaBarbicanACLError() 

148 

149 

150class BarbicanUserAppCreds(object): 

151 def __init__(self, conf): 

152 self.conf = conf 

153 

154 @property 

155 def client(self): 

156 return self.get_client() 

157 

158 def get_client(self): 

159 _require_barbican_key_manager_backend(self.conf) 

160 auth = ks_loading.load_auth_from_conf_options(self.conf, 

161 BARBICAN_GROUP) 

162 sess = ks_session.Session(auth=auth) 

163 return ks_client.Client( 

164 session=sess, 

165 interface=self.conf.barbican.endpoint_type, 

166 region_name=self.conf.barbican.region_name) 

167 

168 def get_application_credentials(self, context, application_credential_id): 

169 if not application_credential_id: 

170 LOG.warning("Missing application credentials ID") 

171 raise exception.ManilaBarbicanAppCredsError() 

172 

173 try: 

174 return self.client.application_credentials.get( 

175 application_credential=application_credential_id) 

176 except Exception as e: 

177 LOG.error("Aborting App Creds request due to error: %s", e) 

178 raise exception.ManilaBarbicanAppCredsError() 

179 

180 def create_application_credentials(self, context, secret): 

181 try: 

182 secrets_path = "/key-manager/v1/secrets" 

183 return self.client.application_credentials.create( 

184 name='manila_barbican_' + uuidutils.generate_uuid(), 

185 user=self.client.session.get_user_id(), 

186 roles=[{'name': 'service'}], 

187 secret=str(secret), 

188 access_rules=[ 

189 { 

190 "path": secrets_path + "/%s" % secret, 

191 "method": "GET", 

192 "service": "key-manager", 

193 }, 

194 { 

195 "path": secrets_path + "/%s/payload" % secret, 

196 "method": "GET", 

197 "service": "key-manager", 

198 } 

199 ] 

200 ) 

201 except Exception as e: 

202 LOG.error("Aborting App Creds create due to error: %s", e) 

203 raise exception.ManilaBarbicanAppCredsError() 

204 

205 def delete_application_credentials(self, context, 

206 application_credential_id): 

207 if not application_credential_id: 

208 LOG.warning("Missing application credentials ID") 

209 raise exception.ManilaBarbicanAppCredsError() 

210 

211 try: 

212 return self.client.application_credentials.delete( 

213 application_credential=application_credential_id) 

214 except Exception as e: 

215 LOG.error("Aborting App Creds request due to error: %s", e) 

216 raise exception.ManilaBarbicanAppCredsError() 

217 

218 

219def create_secret_access(context, secret_ref, conf=CONF): 

220 BarbicanSecretACL(conf).create_secret_access(context, secret_ref) 

221 

222 

223def delete_secret_access(context, secret_ref, conf=CONF): 

224 BarbicanSecretACL(conf).delete_secret_access(context, secret_ref) 

225 

226 

227def get_secret_href(context, secret_ref, conf=CONF): 

228 return BarbicanSecretACL(conf).get_secret_href(context, secret_ref) 

229 

230 

231def create_application_credentials(context, secret, conf=CONF): 

232 return BarbicanUserAppCreds(conf).create_application_credentials( 

233 context, 

234 secret) 

235 

236 

237def get_application_credentials(context, application_credential_id, conf=CONF): 

238 return BarbicanUserAppCreds(conf).get_application_credentials( 

239 context, 

240 application_credential_id) 

241 

242 

243def delete_application_credentials(context, 

244 application_credential_id, conf=CONF): 

245 BarbicanUserAppCreds(conf).delete_application_credentials( 

246 context, 

247 application_credential_id)