Coverage for manila/share/drivers/windows/windows_smb_helper.py: 97%

126 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Cloudbase Solutions SRL 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import json 

17import os 

18 

19from oslo_log import log 

20 

21from manila.common import constants 

22from manila import exception 

23from manila.share.drivers import helpers 

24from manila.share.drivers.windows import windows_utils 

25 

26LOG = log.getLogger(__name__) 

27 

28 

29class WindowsSMBHelper(helpers.CIFSHelperBase): 

30 _SHARE_ACCESS_RIGHT_MAP = { 

31 constants.ACCESS_LEVEL_RW: "Change", 

32 constants.ACCESS_LEVEL_RO: "Read"} 

33 

34 _NULL_SID = "S-1-0-0" 

35 

36 _WIN_ACL_ALLOW = 0 

37 _WIN_ACL_DENY = 1 

38 

39 _WIN_ACCESS_RIGHT_FULL = 0 

40 _WIN_ACCESS_RIGHT_CHANGE = 1 

41 _WIN_ACCESS_RIGHT_READ = 2 

42 _WIN_ACCESS_RIGHT_CUSTOM = 3 

43 

44 _ACCESS_LEVEL_CUSTOM = 'custom' 

45 

46 _WIN_ACL_MAP = { 

47 _WIN_ACCESS_RIGHT_CHANGE: constants.ACCESS_LEVEL_RW, 

48 _WIN_ACCESS_RIGHT_FULL: constants.ACCESS_LEVEL_RW, 

49 _WIN_ACCESS_RIGHT_READ: constants.ACCESS_LEVEL_RO, 

50 _WIN_ACCESS_RIGHT_CUSTOM: _ACCESS_LEVEL_CUSTOM, 

51 } 

52 

53 _SUPPORTED_ACCESS_LEVELS = (constants.ACCESS_LEVEL_RO, 

54 constants.ACCESS_LEVEL_RW) 

55 _SUPPORTED_ACCESS_TYPES = ('user', ) 

56 

57 def __init__(self, remote_execute, configuration): 

58 self._remote_exec = remote_execute 

59 self.configuration = configuration 

60 self._windows_utils = windows_utils.WindowsUtils( 

61 remote_execute=remote_execute) 

62 

63 def init_helper(self, server): 

64 self._remote_exec(server, "Get-SmbShare") 

65 

66 def create_exports(self, server, share_name, recreate=False): 

67 export_location = '\\\\%s\\%s' % (server['public_address'], 

68 share_name) 

69 if not self._share_exists(server, share_name): 

70 share_path = self._windows_utils.normalize_path( 

71 os.path.join(self.configuration.share_mount_path, 

72 share_name)) 

73 # If no access rules are requested, 'Everyone' will have read 

74 # access, by default. We set read access for the 'NULL SID' in 

75 # order to avoid this. 

76 cmd = ['New-SmbShare', '-Name', share_name, '-Path', share_path, 

77 '-ReadAccess', "*%s" % self._NULL_SID] 

78 self._remote_exec(server, cmd) 

79 else: 

80 LOG.info("Skipping creating export %s as it already exists.", 

81 share_name) 

82 return self.get_exports_for_share(server, export_location) 

83 

84 def remove_exports(self, server, share_name): 

85 if self._share_exists(server, share_name): 85 ↛ 89line 85 didn't jump to line 89 because the condition on line 85 was always true

86 cmd = ['Remove-SmbShare', '-Name', share_name, "-Force"] 

87 self._remote_exec(server, cmd) 

88 else: 

89 LOG.debug("Skipping removing export %s as it does not exist.", 

90 share_name) 

91 

92 def _get_volume_path_by_share_name(self, server, share_name): 

93 share_path = self._get_share_path_by_name(server, share_name) 

94 volume_path = self._windows_utils.get_volume_path_by_mount_path( 

95 server, share_path) 

96 return volume_path 

97 

98 def _get_acls(self, server, share_name): 

99 cmd = ('Get-SmbShareAccess -Name %(share_name)s | ' 

100 'Select-Object @("Name", "AccountName", ' 

101 '"AccessControlType", "AccessRight") | ' 

102 'ConvertTo-JSON -Compress' % {'share_name': share_name}) 

103 (out, err) = self._remote_exec(server, cmd) 

104 

105 if not out.strip(): 

106 return [] 

107 

108 raw_acls = json.loads(out) 

109 if isinstance(raw_acls, dict): 

110 return [raw_acls] 

111 return raw_acls 

112 

113 def get_access_rules(self, server, share_name): 

114 raw_acls = self._get_acls(server, share_name) 

115 acls = [] 

116 

117 for raw_acl in raw_acls: 

118 access_to = raw_acl['AccountName'] 

119 access_right = raw_acl['AccessRight'] 

120 access_level = self._WIN_ACL_MAP[access_right] 

121 access_allow = raw_acl["AccessControlType"] == self._WIN_ACL_ALLOW 

122 

123 if not access_allow: 

124 if access_to.lower() == 'everyone' and len(raw_acls) == 1: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true

125 LOG.debug("No access rules are set yet for share %s", 

126 share_name) 

127 else: 

128 LOG.warning( 

129 "Found explicit deny ACE rule that was not " 

130 "created by Manila and will be ignored: %s", 

131 raw_acl) 

132 continue 

133 if access_level == self._ACCESS_LEVEL_CUSTOM: 

134 LOG.warning( 

135 "Found 'custom' ACE rule that will be ignored: %s", 

136 raw_acl) 

137 continue 

138 elif access_right == self._WIN_ACCESS_RIGHT_FULL: 

139 LOG.warning( 

140 "Account '%(access_to)s' was given full access " 

141 "right on share %(share_name)s. Manila only " 

142 "grants 'change' access.", 

143 {'access_to': access_to, 

144 'share_name': share_name}) 

145 

146 acl = { 

147 'access_to': access_to, 

148 'access_level': access_level, 

149 'access_type': 'user', 

150 } 

151 acls.append(acl) 

152 return acls 

153 

154 def _grant_share_access(self, server, share_name, access_level, access_to): 

155 access_right = self._SHARE_ACCESS_RIGHT_MAP[access_level] 

156 cmd = ["Grant-SmbShareAccess", "-Name", share_name, 

157 "-AccessRight", access_right, 

158 "-AccountName", "'%s'" % access_to, "-Force"] 

159 self._remote_exec(server, cmd) 

160 self._refresh_acl(server, share_name) 

161 LOG.info("Granted %(access_level)s access to '%(access_to)s' " 

162 "on share %(share_name)s", 

163 {'access_level': access_level, 

164 'access_to': access_to, 

165 'share_name': share_name}) 

166 

167 def _refresh_acl(self, server, share_name): 

168 cmd = ['Set-SmbPathAcl', '-ShareName', share_name] 

169 self._remote_exec(server, cmd) 

170 

171 def _revoke_share_access(self, server, share_name, access_to): 

172 cmd = ['Revoke-SmbShareAccess', '-Name', share_name, 

173 '-AccountName', '"%s"' % access_to, '-Force'] 

174 self._remote_exec(server, cmd) 

175 self._refresh_acl(server, share_name) 

176 LOG.info("Revoked access to '%(access_to)s' " 

177 "on share %(share_name)s", 

178 {'access_to': access_to, 

179 'share_name': share_name}) 

180 

181 def update_access(self, server, share_name, access_rules, add_rules, 

182 delete_rules, update_rules): 

183 self.validate_access_rules( 

184 access_rules + add_rules, 

185 self._SUPPORTED_ACCESS_TYPES, 

186 self._SUPPORTED_ACCESS_LEVELS) 

187 

188 if not (add_rules or delete_rules): 

189 existing_rules = self.get_access_rules(server, share_name) 

190 add_rules, delete_rules = self._get_rule_updates( 

191 existing_rules=existing_rules, 

192 requested_rules=access_rules) 

193 LOG.debug(("Missing rules: %(add_rules)s, " 

194 "superfluous rules: %(delete_rules)s"), 

195 {'add_rules': add_rules, 

196 'delete_rules': delete_rules}) 

197 

198 # Some rules may have changed, so we'll 

199 # treat the deleted rules first. 

200 for deleted_rule in delete_rules: 

201 try: 

202 self.validate_access_rules( 

203 [deleted_rule], 

204 self._SUPPORTED_ACCESS_TYPES, 

205 self._SUPPORTED_ACCESS_LEVELS) 

206 except (exception.InvalidShareAccess, 

207 exception.InvalidShareAccessLevel): 

208 # This check will allow invalid rules to be deleted. 

209 LOG.warning( 

210 "Unsupported access level %(level)s or access type " 

211 "%(type)s, skipping removal of access rule to " 

212 "%(to)s.", {'level': deleted_rule['access_level'], 

213 'type': deleted_rule['access_type'], 

214 'to': deleted_rule['access_to']}) 

215 continue 

216 self._revoke_share_access(server, share_name, 

217 deleted_rule['access_to']) 

218 

219 for added_rule in add_rules: 

220 self._grant_share_access(server, share_name, 

221 added_rule['access_level'], 

222 added_rule['access_to']) 

223 

224 def _subtract_access_rules(self, access_rules, subtracted_rules): 

225 # Account names are case insensitive on Windows. 

226 filter_rules = lambda rules: [ # noqa: E731 

227 {'access_to': access_rule['access_to'].lower(), 

228 'access_level': access_rule['access_level'], 

229 'access_type': access_rule['access_type']} 

230 for access_rule in rules] 

231 

232 return [rule for rule in filter_rules(access_rules) 

233 if rule not in filter_rules(subtracted_rules)] 

234 

235 def _get_rule_updates(self, existing_rules, requested_rules): 

236 added_rules = self._subtract_access_rules(requested_rules, 

237 existing_rules) 

238 deleted_rules = self._subtract_access_rules(existing_rules, 

239 requested_rules) 

240 return added_rules, deleted_rules 

241 

242 def _get_share_name(self, export_location): 

243 return self._windows_utils.normalize_path( 

244 export_location).split('\\')[-1] 

245 

246 def _get_export_location_template(self, old_export_location): 

247 share_name = self._get_share_name(old_export_location) 

248 return '\\\\%s' + ('\\%s' % share_name) 

249 

250 def _get_share_path_by_name(self, server, share_name, 

251 ignore_missing=False): 

252 cmd = ('Get-SmbShare -Name %s | ' 

253 'Select-Object -ExpandProperty Path' % share_name) 

254 

255 check_exit_code = not ignore_missing 

256 (share_path, err) = self._remote_exec(server, cmd, 

257 check_exit_code=check_exit_code) 

258 return share_path.strip() if share_path else None 

259 

260 def get_share_path_by_export_location(self, server, export_location): 

261 share_name = self._get_share_name(export_location) 

262 return self._get_share_path_by_name(server, share_name) 

263 

264 def _share_exists(self, server, share_name): 

265 share_path = self._get_share_path_by_name(server, share_name, 

266 ignore_missing=True) 

267 return bool(share_path)