Coverage for manila/tests/share/drivers/container/test_security_service_helper.py: 100%

65 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2021 NetApp, Inc. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15"""Unit tests for the Security Service helper module.""" 

16 

17from unittest import mock 

18 

19import ddt 

20 

21from manila import exception 

22from manila.share import configuration 

23from manila.share.drivers.container import security_service_helper 

24from manila import test 

25from manila.tests import db_utils 

26 

27 

28INVALID_CREDENTIALS_EXIT_CODE = 49 

29 

30 

31@ddt.ddt 

32class SecurityServiceHelperTestCase(test.TestCase): 

33 """Tests DockerExecHelper""" 

34 

35 def setUp(self): 

36 super(SecurityServiceHelperTestCase, self).setUp() 

37 self.fake_conf = configuration.Configuration(None) 

38 self.fake_conf.container_image_name = "fake_image" 

39 self.fake_conf.container_volume_mount_path = "/tmp/shares" 

40 self.security_service_helper = ( 

41 security_service_helper.SecurityServiceHelper( 

42 configuration=self.fake_conf)) 

43 

44 def test_setup_security_service(self): 

45 share_server = db_utils.create_share_server() 

46 security_service = db_utils.create_security_service() 

47 

48 mock_ldap_bind = self.mock_object( 

49 self.security_service_helper, 'ldap_bind') 

50 

51 self.security_service_helper.setup_security_service( 

52 share_server['id'], security_service) 

53 

54 mock_ldap_bind.assert_called_once_with( 

55 share_server['id'], security_service) 

56 

57 def test_update_security_service(self): 

58 share_server = db_utils.create_share_server() 

59 current_security_service = db_utils.create_security_service() 

60 new_security_service = db_utils.create_security_service() 

61 

62 mock_ldap_bind = self.mock_object( 

63 self.security_service_helper, 'ldap_bind') 

64 

65 self.security_service_helper.update_security_service( 

66 share_server['id'], current_security_service, new_security_service) 

67 

68 mock_ldap_bind.assert_called_once_with( 

69 share_server['id'], new_security_service) 

70 

71 def _setup_test_ldap_bind_tests(self): 

72 share_server = db_utils.create_security_service() 

73 security_service = db_utils.create_security_service() 

74 ldap_get_info = { 

75 'ss_password': security_service['password'], 

76 'ss_user': security_service['user'] 

77 } 

78 expected_cmd = [ 

79 "docker", "exec", "%s" % share_server['id'], "ldapwhoami", "-x", 

80 "-H", "ldap://localhost:389", "-D", 

81 "cn=%s,dc=example,dc=com" % ldap_get_info[ 

82 "ss_user"], 

83 "-w", "%s" % ldap_get_info["ss_password"]] 

84 

85 return share_server, security_service, ldap_get_info, expected_cmd 

86 

87 def test_ldap_bind(self): 

88 share_server, security_service, ldap_get_info, expected_cmd = ( 

89 self._setup_test_ldap_bind_tests()) 

90 

91 mock_ldap_get_info = self.mock_object( 

92 self.security_service_helper, 'ldap_get_info', 

93 mock.Mock(return_value=ldap_get_info)) 

94 mock_ldap_retry_operation = self.mock_object( 

95 self.security_service_helper, 'ldap_retry_operation') 

96 

97 self.security_service_helper.ldap_bind( 

98 share_server['id'], security_service) 

99 

100 mock_ldap_get_info.assert_called_once_with(security_service) 

101 mock_ldap_retry_operation.assert_called_once_with(expected_cmd, 

102 run_as_root=True) 

103 

104 def test_ldap_get_info(self): 

105 security_service = db_utils.create_security_service() 

106 expected_ldap_get_info = { 

107 'ss_password': security_service['password'], 

108 'ss_user': security_service['user'] 

109 } 

110 

111 ldap_get_info = self.security_service_helper.ldap_get_info( 

112 security_service) 

113 

114 self.assertEqual(expected_ldap_get_info, ldap_get_info) 

115 

116 @ddt.data( 

117 {'type': 'ldap'}, 

118 {'user': 'fake_user'}, 

119 {'password': 'fake_password'}, 

120 ) 

121 def test_ldap_get_info_exception(self, sec_service_data): 

122 self.assertRaises( 

123 exception.ShareBackendException, 

124 self.security_service_helper.ldap_get_info, 

125 sec_service_data 

126 ) 

127 

128 def test_ldap_retry_operation(self): 

129 mock_cmd = ["command", "to", "be", "executed"] 

130 

131 mock_execute = self.mock_object(self.security_service_helper, 

132 '_execute') 

133 

134 self.security_service_helper.ldap_retry_operation(mock_cmd, 

135 run_as_root=True) 

136 

137 mock_execute.assert_called_once_with(*mock_cmd, run_as_root=True) 

138 

139 def test_ldap_retry_operation_timeout(self): 

140 mock_cmd = ["command", "to", "be", "executed"] 

141 

142 mock_execute = self.mock_object( 

143 self.security_service_helper, '_execute', 

144 mock.Mock( 

145 side_effect=exception.ProcessExecutionError(exit_code=1))) 

146 

147 self.assertRaises( 

148 exception.ShareBackendException, 

149 self.security_service_helper.ldap_retry_operation, 

150 mock_cmd, 

151 run_as_root=False, 

152 timeout=10) 

153 

154 mock_execute.assert_has_calls([ 

155 mock.call(*mock_cmd, run_as_root=False), 

156 mock.call(*mock_cmd, run_as_root=False)]) 

157 

158 def test_ldap_retry_operation_invalid_credential(self): 

159 mock_cmd = ["command", "to", "be", "executed"] 

160 

161 mock_execute = self.mock_object( 

162 self.security_service_helper, '_execute', 

163 mock.Mock( 

164 side_effect=exception.ProcessExecutionError( 

165 exit_code=49))) 

166 

167 self.assertRaises( 

168 exception.ShareBackendException, 

169 self.security_service_helper.ldap_retry_operation, 

170 mock_cmd, 

171 run_as_root=False) 

172 

173 mock_execute.assert_called_once_with(*mock_cmd, run_as_root=False)