Coverage for manila/share/drivers/container/protocol_helper.py: 97%

76 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2016 Mirantis, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16from oslo_log import log 

17 

18from manila.common import constants as const 

19from manila import exception 

20from manila.i18n import _ 

21 

22LOG = log.getLogger(__name__) 

23 

24 

25class DockerCIFSHelper(object): 

26 def __init__(self, container_helper, *args, **kwargs): 

27 super(DockerCIFSHelper, self).__init__() 

28 self.share = kwargs.get("share") 

29 self.conf = kwargs.get("config") 

30 self.container = container_helper 

31 

32 def create_share(self, server_id): 

33 export_locations = [] 

34 share_name = self.share.share_id 

35 cmd = ["net", "conf", "addshare", share_name, 

36 "/shares/%s" % share_name, "writeable=y"] 

37 if self.conf.container_cifs_guest_ok: 

38 cmd.append("guest_ok=y") 

39 else: 

40 cmd.append("guest_ok=n") 

41 self.container.execute(server_id, cmd) 

42 parameters = { 

43 "browseable": "yes", 

44 "create mask": "0755", 

45 "read only": "no", 

46 } 

47 for param, value in parameters.items(): 

48 self.container.execute( 

49 server_id, 

50 ["net", "conf", "setparm", share_name, param, value] 

51 ) 

52 # TODO(tbarron): pass configured address family when we support IPv6 

53 addresses = self.container.fetch_container_addresses( 

54 server_id, address_family="inet") 

55 for address in addresses: 

56 export_location = { 

57 "is_admin_only": False, 

58 "path": "//%(ip_address)s/%(share_name)s" % 

59 { 

60 "ip_address": address, 

61 "share_name": share_name 

62 }, 

63 "preferred": False 

64 } 

65 export_locations.append(export_location) 

66 return export_locations 

67 

68 def delete_share(self, server_id, share_name, ignore_errors=False): 

69 self.container.execute( 

70 server_id, 

71 ["net", "conf", "delshare", share_name], 

72 ignore_errors=ignore_errors 

73 ) 

74 

75 def _get_access_group(self, access_level): 

76 if access_level == const.ACCESS_LEVEL_RO: 

77 access = "read list" 

78 elif access_level == const.ACCESS_LEVEL_RW: 

79 access = "valid users" 

80 else: 

81 raise exception.InvalidShareAccessLevel(level=access_level) 

82 return access 

83 

84 def _get_existing_users(self, server_id, share_name, access): 

85 result = self.container.execute( 

86 server_id, 

87 ["net", "conf", "getparm", share_name, access], 

88 ignore_errors=True 

89 ) 

90 if result: 90 ↛ 93line 90 didn't jump to line 93 because the condition on line 90 was always true

91 return result[0].rstrip('\n') 

92 else: 

93 return "" 

94 

95 def _set_users(self, server_id, share_name, access, users_to_set): 

96 self.container.execute( 

97 server_id, 

98 ["net", "conf", "setparm", share_name, access, users_to_set] 

99 ) 

100 

101 def _allow_access(self, share_name, server_id, user_to_allow, 

102 access_level): 

103 access = self._get_access_group(access_level) 

104 try: 

105 existing_users = self._get_existing_users(server_id, share_name, 

106 access) 

107 except TypeError: 

108 users_to_allow = user_to_allow 

109 else: 

110 users_to_allow = " ".join([existing_users, user_to_allow]) 

111 self._set_users(server_id, share_name, access, users_to_allow) 

112 

113 def _deny_access(self, share_name, server_id, user_to_deny, 

114 access_level): 

115 access = self._get_access_group(access_level) 

116 try: 

117 existing_users = self._get_existing_users(server_id, share_name, 

118 access) 

119 except TypeError: 

120 LOG.warning("Can't access smbd at share %s.", share_name) 

121 return 

122 else: 

123 allowed_users = " ".join(sorted(set(existing_users.split()) - 

124 set([user_to_deny]))) 

125 if allowed_users != existing_users: 125 ↛ exitline 125 didn't return from function '_deny_access' because the condition on line 125 was always true

126 self._set_users(server_id, share_name, access, allowed_users) 

127 

128 def update_access(self, server_id, share_name, access_rules, 

129 add_rules=None, delete_rules=None): 

130 

131 def _rule_updater(rules, action, override_type_check=False): 

132 for rule in rules: 

133 access_level = rule['access_level'] 

134 access_type = rule['access_type'] 

135 # (aovchinnikov): override_type_check is used to ensure 

136 # broken rules deletion. 

137 if access_type == 'user' or override_type_check: 

138 action(share_name, server_id, rule['access_to'], 

139 access_level) 

140 else: 

141 msg = _("Access type '%s' is not supported by the " 

142 "driver.") % access_type 

143 raise exception.InvalidShareAccess(reason=msg) 

144 

145 if not (add_rules or delete_rules): 

146 # clean all users first. 

147 self.container.execute( 

148 server_id, 

149 ["net", "conf", "setparm", share_name, "valid users", ""] 

150 ) 

151 _rule_updater(access_rules or [], self._allow_access) 

152 return 

153 _rule_updater(add_rules or [], self._allow_access) 

154 _rule_updater(delete_rules or [], self._deny_access, 

155 override_type_check=True)