Coverage for manila/share/drivers/netapp/dataontap/protocols/cifs_cmode.py: 100%

86 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Clinton Knight. All rights reserved. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14""" 

15NetApp cDOT CIFS protocol helper class. 

16""" 

17 

18import re 

19 

20from manila.common import constants 

21from manila import exception 

22from manila.i18n import _ 

23from manila.share.drivers.netapp.dataontap.protocols import base 

24from manila.share.drivers.netapp import utils as na_utils 

25 

26 

27class NetAppCmodeCIFSHelper(base.NetAppBaseHelper): 

28 """NetApp cDOT CIFS protocol helper class.""" 

29 

30 @na_utils.trace 

31 def create_share(self, share, share_name, 

32 clear_current_export_policy=True, 

33 ensure_share_already_exists=False, replica=False, 

34 is_flexgroup=False): 

35 """Creates CIFS share if does not exist on Data ONTAP Vserver. 

36 

37 The new CIFS share has Everyone access, so it removes all access after 

38 creating. 

39 

40 :param share: share entity. 

41 :param share_name: share name that must be the CIFS share name. 

42 :param clear_current_export_policy: ignored, NFS only. 

43 :param ensure_share_already_exists: ensures that CIFS share exists. 

44 :param replica: it is a replica volume (DP type). 

45 :param is_flexgroup: whether the share is a FlexGroup or not. 

46 """ 

47 

48 cifs_exist = self._client.cifs_share_exists(share_name) 

49 export_path = self._client.get_volume_junction_path(share_name) 

50 if ensure_share_already_exists and not cifs_exist: 

51 msg = _("The expected CIFS share %(share_name)s was not found.") 

52 msg_args = {'share_name': share_name} 

53 raise exception.NetAppException(msg % msg_args) 

54 elif not cifs_exist: 

55 self._client.create_cifs_share(share_name, export_path) 

56 self._client.remove_cifs_share_access(share_name, 'Everyone') 

57 

58 # Ensure 'ntfs' security style for RW volume. DP volumes cannot set it. 

59 if not replica: 

60 self._client.set_volume_security_style(share_name, 

61 security_style='ntfs') 

62 

63 # Return a callback that may be used for generating export paths 

64 # for this share. 

65 return (lambda export_address, export_path=export_path: 

66 r'\\%s%s' % (export_address, export_path.replace('/', '\\'))) 

67 

68 @na_utils.trace 

69 def delete_share(self, share, share_name): 

70 """Deletes CIFS share on Data ONTAP Vserver.""" 

71 host_ip, share_name = self._get_export_location(share) 

72 self._client.remove_cifs_share(share_name) 

73 

74 @na_utils.trace 

75 @base.access_rules_synchronized 

76 def update_access(self, share, share_name, rules): 

77 """Replaces the list of access rules known to the backend storage.""" 

78 

79 _, cifs_share_name = self._get_export_location(share) 

80 # Ensure rules are valid 

81 for rule in rules: 

82 self._validate_access_rule(rule) 

83 

84 new_rules = {rule['access_to']: rule['access_level'] for rule in rules} 

85 

86 # Get rules from share 

87 existing_rules = self._get_access_rules(share, cifs_share_name) 

88 

89 # Update rules in an order that will prevent transient disruptions 

90 self._handle_added_rules(cifs_share_name, existing_rules, new_rules) 

91 self._handle_ro_to_rw_rules(cifs_share_name, existing_rules, new_rules) 

92 self._handle_rw_to_ro_rules(cifs_share_name, existing_rules, new_rules) 

93 self._handle_deleted_rules(cifs_share_name, existing_rules, new_rules) 

94 

95 @na_utils.trace 

96 def _validate_access_rule(self, rule): 

97 """Checks whether access rule type and level are valid.""" 

98 

99 if rule['access_type'] != 'user': 

100 msg = _("Clustered Data ONTAP supports only 'user' type for " 

101 "share access rules with CIFS protocol.") 

102 raise exception.InvalidShareAccess(reason=msg) 

103 

104 if rule['access_level'] not in constants.ACCESS_LEVELS: 

105 raise exception.InvalidShareAccessLevel(level=rule['access_level']) 

106 

107 @na_utils.trace 

108 def _handle_added_rules(self, share_name, existing_rules, new_rules): 

109 """Updates access rules added between two rule sets.""" 

110 added_rules = { 

111 user_or_group: permission 

112 for user_or_group, permission in new_rules.items() 

113 if user_or_group not in existing_rules 

114 } 

115 

116 for user_or_group, permission in added_rules.items(): 

117 self._client.add_cifs_share_access( 

118 share_name, user_or_group, self._is_readonly(permission)) 

119 

120 @na_utils.trace 

121 def _handle_ro_to_rw_rules(self, share_name, existing_rules, new_rules): 

122 """Updates access rules modified (RO-->RW) between two rule sets.""" 

123 modified_rules = { 

124 user_or_group: permission 

125 for user_or_group, permission in new_rules.items() 

126 if (user_or_group in existing_rules and 

127 permission == constants.ACCESS_LEVEL_RW and 

128 existing_rules[user_or_group] != 'full_control') 

129 } 

130 

131 for user_or_group, permission in modified_rules.items(): 

132 self._client.modify_cifs_share_access( 

133 share_name, user_or_group, self._is_readonly(permission)) 

134 

135 @na_utils.trace 

136 def _handle_rw_to_ro_rules(self, share_name, existing_rules, new_rules): 

137 """Returns access rules modified (RW-->RO) between two rule sets.""" 

138 modified_rules = { 

139 user_or_group: permission 

140 for user_or_group, permission in new_rules.items() 

141 if (user_or_group in existing_rules and 

142 permission == constants.ACCESS_LEVEL_RO and 

143 existing_rules[user_or_group] != 'read') 

144 } 

145 

146 for user_or_group, permission in modified_rules.items(): 

147 self._client.modify_cifs_share_access( 

148 share_name, user_or_group, self._is_readonly(permission)) 

149 

150 @na_utils.trace 

151 def _handle_deleted_rules(self, share_name, existing_rules, new_rules): 

152 """Returns access rules deleted between two rule sets.""" 

153 deleted_rules = { 

154 user_or_group: permission 

155 for user_or_group, permission in existing_rules.items() 

156 if user_or_group not in new_rules 

157 } 

158 

159 for user_or_group, permission in deleted_rules.items(): 

160 self._client.remove_cifs_share_access(share_name, user_or_group) 

161 

162 @na_utils.trace 

163 def _get_access_rules(self, share, share_name): 

164 """Returns the list of access rules known to the backend storage.""" 

165 return self._client.get_cifs_share_access(share_name) 

166 

167 @na_utils.trace 

168 def get_target(self, share): 

169 """Returns OnTap target IP based on share export location.""" 

170 return self._get_export_location(share)[0] 

171 

172 @na_utils.trace 

173 def get_share_name_for_share(self, share): 

174 """Returns the flexvol name that hosts a share.""" 

175 _, volume_junction_path = self._get_export_location(share) 

176 volume = self._client.get_volume_at_junction_path( 

177 f"/{volume_junction_path}") 

178 return volume.get('name') if volume else None 

179 

180 @na_utils.trace 

181 def _get_export_location(self, share): 

182 """Returns host ip and share name for a given CIFS share.""" 

183 export_location = self._get_share_export_location(share) or '\\\\\\' 

184 regex = r'^(?:\\\\|//)(?P<host_ip>.*)(?:\\|/)(?P<share_name>.*)$' 

185 match = re.match(regex, export_location) 

186 if match: 

187 return match.group('host_ip'), match.group('share_name') 

188 else: 

189 return '', '' 

190 

191 @na_utils.trace 

192 def cleanup_demoted_replica(self, share, share_name): 

193 """Cleans up CIFS share for a demoted replica.""" 

194 self._client.remove_cifs_share(share_name)