Coverage for manila/tests/share/drivers/container/test_security_service_helper.py: 100%
65 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2021 NetApp, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15"""Unit tests for the Security Service helper module."""
17from unittest import mock
19import ddt
21from manila import exception
22from manila.share import configuration
23from manila.share.drivers.container import security_service_helper
24from manila import test
25from manila.tests import db_utils
28INVALID_CREDENTIALS_EXIT_CODE = 49
31@ddt.ddt
32class SecurityServiceHelperTestCase(test.TestCase):
33 """Tests DockerExecHelper"""
35 def setUp(self):
36 super(SecurityServiceHelperTestCase, self).setUp()
37 self.fake_conf = configuration.Configuration(None)
38 self.fake_conf.container_image_name = "fake_image"
39 self.fake_conf.container_volume_mount_path = "/tmp/shares"
40 self.security_service_helper = (
41 security_service_helper.SecurityServiceHelper(
42 configuration=self.fake_conf))
44 def test_setup_security_service(self):
45 share_server = db_utils.create_share_server()
46 security_service = db_utils.create_security_service()
48 mock_ldap_bind = self.mock_object(
49 self.security_service_helper, 'ldap_bind')
51 self.security_service_helper.setup_security_service(
52 share_server['id'], security_service)
54 mock_ldap_bind.assert_called_once_with(
55 share_server['id'], security_service)
57 def test_update_security_service(self):
58 share_server = db_utils.create_share_server()
59 current_security_service = db_utils.create_security_service()
60 new_security_service = db_utils.create_security_service()
62 mock_ldap_bind = self.mock_object(
63 self.security_service_helper, 'ldap_bind')
65 self.security_service_helper.update_security_service(
66 share_server['id'], current_security_service, new_security_service)
68 mock_ldap_bind.assert_called_once_with(
69 share_server['id'], new_security_service)
71 def _setup_test_ldap_bind_tests(self):
72 share_server = db_utils.create_security_service()
73 security_service = db_utils.create_security_service()
74 ldap_get_info = {
75 'ss_password': security_service['password'],
76 'ss_user': security_service['user']
77 }
78 expected_cmd = [
79 "docker", "exec", "%s" % share_server['id'], "ldapwhoami", "-x",
80 "-H", "ldap://localhost:389", "-D",
81 "cn=%s,dc=example,dc=com" % ldap_get_info[
82 "ss_user"],
83 "-w", "%s" % ldap_get_info["ss_password"]]
85 return share_server, security_service, ldap_get_info, expected_cmd
87 def test_ldap_bind(self):
88 share_server, security_service, ldap_get_info, expected_cmd = (
89 self._setup_test_ldap_bind_tests())
91 mock_ldap_get_info = self.mock_object(
92 self.security_service_helper, 'ldap_get_info',
93 mock.Mock(return_value=ldap_get_info))
94 mock_ldap_retry_operation = self.mock_object(
95 self.security_service_helper, 'ldap_retry_operation')
97 self.security_service_helper.ldap_bind(
98 share_server['id'], security_service)
100 mock_ldap_get_info.assert_called_once_with(security_service)
101 mock_ldap_retry_operation.assert_called_once_with(expected_cmd,
102 run_as_root=True)
104 def test_ldap_get_info(self):
105 security_service = db_utils.create_security_service()
106 expected_ldap_get_info = {
107 'ss_password': security_service['password'],
108 'ss_user': security_service['user']
109 }
111 ldap_get_info = self.security_service_helper.ldap_get_info(
112 security_service)
114 self.assertEqual(expected_ldap_get_info, ldap_get_info)
116 @ddt.data(
117 {'type': 'ldap'},
118 {'user': 'fake_user'},
119 {'password': 'fake_password'},
120 )
121 def test_ldap_get_info_exception(self, sec_service_data):
122 self.assertRaises(
123 exception.ShareBackendException,
124 self.security_service_helper.ldap_get_info,
125 sec_service_data
126 )
128 def test_ldap_retry_operation(self):
129 mock_cmd = ["command", "to", "be", "executed"]
131 mock_execute = self.mock_object(self.security_service_helper,
132 '_execute')
134 self.security_service_helper.ldap_retry_operation(mock_cmd,
135 run_as_root=True)
137 mock_execute.assert_called_once_with(*mock_cmd, run_as_root=True)
139 def test_ldap_retry_operation_timeout(self):
140 mock_cmd = ["command", "to", "be", "executed"]
142 mock_execute = self.mock_object(
143 self.security_service_helper, '_execute',
144 mock.Mock(
145 side_effect=exception.ProcessExecutionError(exit_code=1)))
147 self.assertRaises(
148 exception.ShareBackendException,
149 self.security_service_helper.ldap_retry_operation,
150 mock_cmd,
151 run_as_root=False,
152 timeout=10)
154 mock_execute.assert_has_calls([
155 mock.call(*mock_cmd, run_as_root=False),
156 mock.call(*mock_cmd, run_as_root=False)])
158 def test_ldap_retry_operation_invalid_credential(self):
159 mock_cmd = ["command", "to", "be", "executed"]
161 mock_execute = self.mock_object(
162 self.security_service_helper, '_execute',
163 mock.Mock(
164 side_effect=exception.ProcessExecutionError(
165 exit_code=49)))
167 self.assertRaises(
168 exception.ShareBackendException,
169 self.security_service_helper.ldap_retry_operation,
170 mock_cmd,
171 run_as_root=False)
173 mock_execute.assert_called_once_with(*mock_cmd, run_as_root=False)