Coverage for manila/share/drivers/windows/windows_smb_helper.py: 97%
126 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Cloudbase Solutions SRL
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import json
17import os
19from oslo_log import log
21from manila.common import constants
22from manila import exception
23from manila.share.drivers import helpers
24from manila.share.drivers.windows import windows_utils
26LOG = log.getLogger(__name__)
29class WindowsSMBHelper(helpers.CIFSHelperBase):
30 _SHARE_ACCESS_RIGHT_MAP = {
31 constants.ACCESS_LEVEL_RW: "Change",
32 constants.ACCESS_LEVEL_RO: "Read"}
34 _NULL_SID = "S-1-0-0"
36 _WIN_ACL_ALLOW = 0
37 _WIN_ACL_DENY = 1
39 _WIN_ACCESS_RIGHT_FULL = 0
40 _WIN_ACCESS_RIGHT_CHANGE = 1
41 _WIN_ACCESS_RIGHT_READ = 2
42 _WIN_ACCESS_RIGHT_CUSTOM = 3
44 _ACCESS_LEVEL_CUSTOM = 'custom'
46 _WIN_ACL_MAP = {
47 _WIN_ACCESS_RIGHT_CHANGE: constants.ACCESS_LEVEL_RW,
48 _WIN_ACCESS_RIGHT_FULL: constants.ACCESS_LEVEL_RW,
49 _WIN_ACCESS_RIGHT_READ: constants.ACCESS_LEVEL_RO,
50 _WIN_ACCESS_RIGHT_CUSTOM: _ACCESS_LEVEL_CUSTOM,
51 }
53 _SUPPORTED_ACCESS_LEVELS = (constants.ACCESS_LEVEL_RO,
54 constants.ACCESS_LEVEL_RW)
55 _SUPPORTED_ACCESS_TYPES = ('user', )
57 def __init__(self, remote_execute, configuration):
58 self._remote_exec = remote_execute
59 self.configuration = configuration
60 self._windows_utils = windows_utils.WindowsUtils(
61 remote_execute=remote_execute)
63 def init_helper(self, server):
64 self._remote_exec(server, "Get-SmbShare")
66 def create_exports(self, server, share_name, recreate=False):
67 export_location = '\\\\%s\\%s' % (server['public_address'],
68 share_name)
69 if not self._share_exists(server, share_name):
70 share_path = self._windows_utils.normalize_path(
71 os.path.join(self.configuration.share_mount_path,
72 share_name))
73 # If no access rules are requested, 'Everyone' will have read
74 # access, by default. We set read access for the 'NULL SID' in
75 # order to avoid this.
76 cmd = ['New-SmbShare', '-Name', share_name, '-Path', share_path,
77 '-ReadAccess', "*%s" % self._NULL_SID]
78 self._remote_exec(server, cmd)
79 else:
80 LOG.info("Skipping creating export %s as it already exists.",
81 share_name)
82 return self.get_exports_for_share(server, export_location)
84 def remove_exports(self, server, share_name):
85 if self._share_exists(server, share_name): 85 ↛ 89line 85 didn't jump to line 89 because the condition on line 85 was always true
86 cmd = ['Remove-SmbShare', '-Name', share_name, "-Force"]
87 self._remote_exec(server, cmd)
88 else:
89 LOG.debug("Skipping removing export %s as it does not exist.",
90 share_name)
92 def _get_volume_path_by_share_name(self, server, share_name):
93 share_path = self._get_share_path_by_name(server, share_name)
94 volume_path = self._windows_utils.get_volume_path_by_mount_path(
95 server, share_path)
96 return volume_path
98 def _get_acls(self, server, share_name):
99 cmd = ('Get-SmbShareAccess -Name %(share_name)s | '
100 'Select-Object @("Name", "AccountName", '
101 '"AccessControlType", "AccessRight") | '
102 'ConvertTo-JSON -Compress' % {'share_name': share_name})
103 (out, err) = self._remote_exec(server, cmd)
105 if not out.strip():
106 return []
108 raw_acls = json.loads(out)
109 if isinstance(raw_acls, dict):
110 return [raw_acls]
111 return raw_acls
113 def get_access_rules(self, server, share_name):
114 raw_acls = self._get_acls(server, share_name)
115 acls = []
117 for raw_acl in raw_acls:
118 access_to = raw_acl['AccountName']
119 access_right = raw_acl['AccessRight']
120 access_level = self._WIN_ACL_MAP[access_right]
121 access_allow = raw_acl["AccessControlType"] == self._WIN_ACL_ALLOW
123 if not access_allow:
124 if access_to.lower() == 'everyone' and len(raw_acls) == 1: 124 ↛ 125line 124 didn't jump to line 125 because the condition on line 124 was never true
125 LOG.debug("No access rules are set yet for share %s",
126 share_name)
127 else:
128 LOG.warning(
129 "Found explicit deny ACE rule that was not "
130 "created by Manila and will be ignored: %s",
131 raw_acl)
132 continue
133 if access_level == self._ACCESS_LEVEL_CUSTOM:
134 LOG.warning(
135 "Found 'custom' ACE rule that will be ignored: %s",
136 raw_acl)
137 continue
138 elif access_right == self._WIN_ACCESS_RIGHT_FULL:
139 LOG.warning(
140 "Account '%(access_to)s' was given full access "
141 "right on share %(share_name)s. Manila only "
142 "grants 'change' access.",
143 {'access_to': access_to,
144 'share_name': share_name})
146 acl = {
147 'access_to': access_to,
148 'access_level': access_level,
149 'access_type': 'user',
150 }
151 acls.append(acl)
152 return acls
154 def _grant_share_access(self, server, share_name, access_level, access_to):
155 access_right = self._SHARE_ACCESS_RIGHT_MAP[access_level]
156 cmd = ["Grant-SmbShareAccess", "-Name", share_name,
157 "-AccessRight", access_right,
158 "-AccountName", "'%s'" % access_to, "-Force"]
159 self._remote_exec(server, cmd)
160 self._refresh_acl(server, share_name)
161 LOG.info("Granted %(access_level)s access to '%(access_to)s' "
162 "on share %(share_name)s",
163 {'access_level': access_level,
164 'access_to': access_to,
165 'share_name': share_name})
167 def _refresh_acl(self, server, share_name):
168 cmd = ['Set-SmbPathAcl', '-ShareName', share_name]
169 self._remote_exec(server, cmd)
171 def _revoke_share_access(self, server, share_name, access_to):
172 cmd = ['Revoke-SmbShareAccess', '-Name', share_name,
173 '-AccountName', '"%s"' % access_to, '-Force']
174 self._remote_exec(server, cmd)
175 self._refresh_acl(server, share_name)
176 LOG.info("Revoked access to '%(access_to)s' "
177 "on share %(share_name)s",
178 {'access_to': access_to,
179 'share_name': share_name})
181 def update_access(self, server, share_name, access_rules, add_rules,
182 delete_rules, update_rules):
183 self.validate_access_rules(
184 access_rules + add_rules,
185 self._SUPPORTED_ACCESS_TYPES,
186 self._SUPPORTED_ACCESS_LEVELS)
188 if not (add_rules or delete_rules):
189 existing_rules = self.get_access_rules(server, share_name)
190 add_rules, delete_rules = self._get_rule_updates(
191 existing_rules=existing_rules,
192 requested_rules=access_rules)
193 LOG.debug(("Missing rules: %(add_rules)s, "
194 "superfluous rules: %(delete_rules)s"),
195 {'add_rules': add_rules,
196 'delete_rules': delete_rules})
198 # Some rules may have changed, so we'll
199 # treat the deleted rules first.
200 for deleted_rule in delete_rules:
201 try:
202 self.validate_access_rules(
203 [deleted_rule],
204 self._SUPPORTED_ACCESS_TYPES,
205 self._SUPPORTED_ACCESS_LEVELS)
206 except (exception.InvalidShareAccess,
207 exception.InvalidShareAccessLevel):
208 # This check will allow invalid rules to be deleted.
209 LOG.warning(
210 "Unsupported access level %(level)s or access type "
211 "%(type)s, skipping removal of access rule to "
212 "%(to)s.", {'level': deleted_rule['access_level'],
213 'type': deleted_rule['access_type'],
214 'to': deleted_rule['access_to']})
215 continue
216 self._revoke_share_access(server, share_name,
217 deleted_rule['access_to'])
219 for added_rule in add_rules:
220 self._grant_share_access(server, share_name,
221 added_rule['access_level'],
222 added_rule['access_to'])
224 def _subtract_access_rules(self, access_rules, subtracted_rules):
225 # Account names are case insensitive on Windows.
226 filter_rules = lambda rules: [ # noqa: E731
227 {'access_to': access_rule['access_to'].lower(),
228 'access_level': access_rule['access_level'],
229 'access_type': access_rule['access_type']}
230 for access_rule in rules]
232 return [rule for rule in filter_rules(access_rules)
233 if rule not in filter_rules(subtracted_rules)]
235 def _get_rule_updates(self, existing_rules, requested_rules):
236 added_rules = self._subtract_access_rules(requested_rules,
237 existing_rules)
238 deleted_rules = self._subtract_access_rules(existing_rules,
239 requested_rules)
240 return added_rules, deleted_rules
242 def _get_share_name(self, export_location):
243 return self._windows_utils.normalize_path(
244 export_location).split('\\')[-1]
246 def _get_export_location_template(self, old_export_location):
247 share_name = self._get_share_name(old_export_location)
248 return '\\\\%s' + ('\\%s' % share_name)
250 def _get_share_path_by_name(self, server, share_name,
251 ignore_missing=False):
252 cmd = ('Get-SmbShare -Name %s | '
253 'Select-Object -ExpandProperty Path' % share_name)
255 check_exit_code = not ignore_missing
256 (share_path, err) = self._remote_exec(server, cmd,
257 check_exit_code=check_exit_code)
258 return share_path.strip() if share_path else None
260 def get_share_path_by_export_location(self, server, export_location):
261 share_name = self._get_share_name(export_location)
262 return self._get_share_path_by_name(server, share_name)
264 def _share_exists(self, server, share_name):
265 share_path = self._get_share_path_by_name(server, share_name,
266 ignore_missing=True)
267 return bool(share_path)