Coverage for manila/share/drivers/netapp/dataontap/protocols/cifs_cmode.py: 100%
86 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Clinton Knight. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14"""
15NetApp cDOT CIFS protocol helper class.
16"""
18import re
20from manila.common import constants
21from manila import exception
22from manila.i18n import _
23from manila.share.drivers.netapp.dataontap.protocols import base
24from manila.share.drivers.netapp import utils as na_utils
27class NetAppCmodeCIFSHelper(base.NetAppBaseHelper):
28 """NetApp cDOT CIFS protocol helper class."""
30 @na_utils.trace
31 def create_share(self, share, share_name,
32 clear_current_export_policy=True,
33 ensure_share_already_exists=False, replica=False,
34 is_flexgroup=False):
35 """Creates CIFS share if does not exist on Data ONTAP Vserver.
37 The new CIFS share has Everyone access, so it removes all access after
38 creating.
40 :param share: share entity.
41 :param share_name: share name that must be the CIFS share name.
42 :param clear_current_export_policy: ignored, NFS only.
43 :param ensure_share_already_exists: ensures that CIFS share exists.
44 :param replica: it is a replica volume (DP type).
45 :param is_flexgroup: whether the share is a FlexGroup or not.
46 """
48 cifs_exist = self._client.cifs_share_exists(share_name)
49 export_path = self._client.get_volume_junction_path(share_name)
50 if ensure_share_already_exists and not cifs_exist:
51 msg = _("The expected CIFS share %(share_name)s was not found.")
52 msg_args = {'share_name': share_name}
53 raise exception.NetAppException(msg % msg_args)
54 elif not cifs_exist:
55 self._client.create_cifs_share(share_name, export_path)
56 self._client.remove_cifs_share_access(share_name, 'Everyone')
58 # Ensure 'ntfs' security style for RW volume. DP volumes cannot set it.
59 if not replica:
60 self._client.set_volume_security_style(share_name,
61 security_style='ntfs')
63 # Return a callback that may be used for generating export paths
64 # for this share.
65 return (lambda export_address, export_path=export_path:
66 r'\\%s%s' % (export_address, export_path.replace('/', '\\')))
68 @na_utils.trace
69 def delete_share(self, share, share_name):
70 """Deletes CIFS share on Data ONTAP Vserver."""
71 host_ip, share_name = self._get_export_location(share)
72 self._client.remove_cifs_share(share_name)
74 @na_utils.trace
75 @base.access_rules_synchronized
76 def update_access(self, share, share_name, rules):
77 """Replaces the list of access rules known to the backend storage."""
79 _, cifs_share_name = self._get_export_location(share)
80 # Ensure rules are valid
81 for rule in rules:
82 self._validate_access_rule(rule)
84 new_rules = {rule['access_to']: rule['access_level'] for rule in rules}
86 # Get rules from share
87 existing_rules = self._get_access_rules(share, cifs_share_name)
89 # Update rules in an order that will prevent transient disruptions
90 self._handle_added_rules(cifs_share_name, existing_rules, new_rules)
91 self._handle_ro_to_rw_rules(cifs_share_name, existing_rules, new_rules)
92 self._handle_rw_to_ro_rules(cifs_share_name, existing_rules, new_rules)
93 self._handle_deleted_rules(cifs_share_name, existing_rules, new_rules)
95 @na_utils.trace
96 def _validate_access_rule(self, rule):
97 """Checks whether access rule type and level are valid."""
99 if rule['access_type'] != 'user':
100 msg = _("Clustered Data ONTAP supports only 'user' type for "
101 "share access rules with CIFS protocol.")
102 raise exception.InvalidShareAccess(reason=msg)
104 if rule['access_level'] not in constants.ACCESS_LEVELS:
105 raise exception.InvalidShareAccessLevel(level=rule['access_level'])
107 @na_utils.trace
108 def _handle_added_rules(self, share_name, existing_rules, new_rules):
109 """Updates access rules added between two rule sets."""
110 added_rules = {
111 user_or_group: permission
112 for user_or_group, permission in new_rules.items()
113 if user_or_group not in existing_rules
114 }
116 for user_or_group, permission in added_rules.items():
117 self._client.add_cifs_share_access(
118 share_name, user_or_group, self._is_readonly(permission))
120 @na_utils.trace
121 def _handle_ro_to_rw_rules(self, share_name, existing_rules, new_rules):
122 """Updates access rules modified (RO-->RW) between two rule sets."""
123 modified_rules = {
124 user_or_group: permission
125 for user_or_group, permission in new_rules.items()
126 if (user_or_group in existing_rules and
127 permission == constants.ACCESS_LEVEL_RW and
128 existing_rules[user_or_group] != 'full_control')
129 }
131 for user_or_group, permission in modified_rules.items():
132 self._client.modify_cifs_share_access(
133 share_name, user_or_group, self._is_readonly(permission))
135 @na_utils.trace
136 def _handle_rw_to_ro_rules(self, share_name, existing_rules, new_rules):
137 """Returns access rules modified (RW-->RO) between two rule sets."""
138 modified_rules = {
139 user_or_group: permission
140 for user_or_group, permission in new_rules.items()
141 if (user_or_group in existing_rules and
142 permission == constants.ACCESS_LEVEL_RO and
143 existing_rules[user_or_group] != 'read')
144 }
146 for user_or_group, permission in modified_rules.items():
147 self._client.modify_cifs_share_access(
148 share_name, user_or_group, self._is_readonly(permission))
150 @na_utils.trace
151 def _handle_deleted_rules(self, share_name, existing_rules, new_rules):
152 """Returns access rules deleted between two rule sets."""
153 deleted_rules = {
154 user_or_group: permission
155 for user_or_group, permission in existing_rules.items()
156 if user_or_group not in new_rules
157 }
159 for user_or_group, permission in deleted_rules.items():
160 self._client.remove_cifs_share_access(share_name, user_or_group)
162 @na_utils.trace
163 def _get_access_rules(self, share, share_name):
164 """Returns the list of access rules known to the backend storage."""
165 return self._client.get_cifs_share_access(share_name)
167 @na_utils.trace
168 def get_target(self, share):
169 """Returns OnTap target IP based on share export location."""
170 return self._get_export_location(share)[0]
172 @na_utils.trace
173 def get_share_name_for_share(self, share):
174 """Returns the flexvol name that hosts a share."""
175 _, volume_junction_path = self._get_export_location(share)
176 volume = self._client.get_volume_at_junction_path(
177 f"/{volume_junction_path}")
178 return volume.get('name') if volume else None
180 @na_utils.trace
181 def _get_export_location(self, share):
182 """Returns host ip and share name for a given CIFS share."""
183 export_location = self._get_share_export_location(share) or '\\\\\\'
184 regex = r'^(?:\\\\|//)(?P<host_ip>.*)(?:\\|/)(?P<share_name>.*)$'
185 match = re.match(regex, export_location)
186 if match:
187 return match.group('host_ip'), match.group('share_name')
188 else:
189 return '', ''
191 @na_utils.trace
192 def cleanup_demoted_replica(self, share, share_name):
193 """Cleans up CIFS share for a demoted replica."""
194 self._client.remove_cifs_share(share_name)