Coverage for manila/tests/data/test_helper.py: 100%
162 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2015 Hitachi Data Systems inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import os
17from unittest import mock
19import ddt
21from manila.common import constants
22from manila import context
23from manila.data import helper as data_copy_helper
24from manila import db
25from manila import exception
26from manila.share import rpcapi as share_rpc
27from manila import test
28from manila.tests import db_utils
29from manila import utils
32@ddt.ddt
33class DataServiceHelperTestCase(test.TestCase):
34 """Tests DataServiceHelper."""
36 def setUp(self):
37 super(DataServiceHelperTestCase, self).setUp()
38 self.share = db_utils.create_share()
39 self.share_instance = db_utils.create_share_instance(
40 share_id=self.share['id'],
41 status=constants.STATUS_AVAILABLE)
42 self.context = context.get_admin_context()
43 self.share_instance = db.share_instance_get(
44 self.context, self.share_instance['id'], with_share_data=True)
45 self.access = db_utils.create_access(share_id=self.share['id'])
46 self.helper = data_copy_helper.DataServiceHelper(
47 self.context, db, self.share)
49 @ddt.data(True, False)
50 def test_allow_access_to_data_service(self, allow_dest_instance):
52 access = db_utils.create_access(share_id=self.share['id'])
53 info_src = {
54 'access_mapping': {
55 'ip': ['nfs'],
56 'user': ['cifs', 'nfs'],
57 }
58 }
59 info_dest = {
60 'access_mapping': {
61 'ip': ['nfs', 'cifs'],
62 'user': ['cifs'],
63 }
64 }
65 if allow_dest_instance:
66 mapping = {'ip': ['nfs'], 'user': ['cifs']}
67 else:
68 mapping = info_src['access_mapping']
70 fake_access = {
71 'access_to': 'fake_ip',
72 'access_level': constants.ACCESS_LEVEL_RW,
73 'access_type': 'ip',
74 }
75 access_values = fake_access
76 access_values['share_id'] = self.share['id']
78 self.mock_object(
79 self.helper, '_get_access_entries_according_to_mapping',
80 mock.Mock(return_value=[fake_access]))
81 self.mock_object(
82 self.helper.db, 'share_access_get_all_by_type_and_access',
83 mock.Mock(return_value=[access]))
84 change_data_access_call = self.mock_object(
85 self.helper, '_change_data_access_to_instance')
86 self.mock_object(self.helper.db, 'share_instance_access_create',
87 mock.Mock(return_value=access))
89 if allow_dest_instance:
90 result = self.helper.allow_access_to_data_service(
91 self.share_instance, info_src, self.share_instance, info_dest)
92 else:
93 result = self.helper.allow_access_to_data_service(
94 self.share_instance, info_src)
96 self.assertEqual([access], result)
98 (self.helper._get_access_entries_according_to_mapping.
99 assert_called_once_with(mapping))
100 (self.helper.db.share_access_get_all_by_type_and_access.
101 assert_called_once_with(
102 self.context, self.share['id'], fake_access['access_type'],
103 fake_access['access_to']))
104 access_create_calls = [
105 mock.call(self.context, access_values, self.share_instance['id'])
106 ]
107 if allow_dest_instance:
108 access_create_calls.append(mock.call(
109 self.context, access_values, self.share_instance['id']))
110 self.helper.db.share_instance_access_create.assert_has_calls(
111 access_create_calls)
112 change_access_calls = [
113 mock.call(self.share_instance, [access], deny=True),
114 mock.call(self.share_instance),
115 ]
116 if allow_dest_instance:
117 change_access_calls.append(
118 mock.call(self.share_instance))
119 self.assertEqual(len(change_access_calls),
120 change_data_access_call.call_count)
121 change_data_access_call.assert_has_calls(change_access_calls)
123 @ddt.data({'ip': []}, {'cert': []}, {'user': []}, {'cephx': []}, {'x': []})
124 def test__get_access_entries_according_to_mapping(self, mapping):
126 data_copy_helper.CONF.data_node_access_cert = 'fake'
127 data_copy_helper.CONF.data_node_access_ips = 'fake'
128 data_copy_helper.CONF.data_node_access_admin_user = 'fake'
129 expected = [{
130 'access_type': list(mapping.keys())[0],
131 'access_level': constants.ACCESS_LEVEL_RW,
132 'access_to': 'fake',
133 }]
135 exists = [x for x in mapping if x in ('ip', 'user', 'cert')]
137 if exists:
138 result = self.helper._get_access_entries_according_to_mapping(
139 mapping)
140 self.assertEqual(expected, result)
141 else:
142 self.assertRaises(
143 exception.ShareDataCopyFailed,
144 self.helper._get_access_entries_according_to_mapping, mapping)
146 def test__get_access_entries_according_to_mapping_exception_not_set(self):
148 data_copy_helper.CONF.data_node_access_ips = None
150 self.assertRaises(
151 exception.ShareDataCopyFailed,
152 self.helper._get_access_entries_according_to_mapping, {'ip': []})
154 def test__get_access_entries_according_to_mapping_ip_list(self):
156 ips = ['fake1', 'fake2']
157 data_copy_helper.CONF.data_node_access_ips = ips
159 expected = [{
160 'access_type': 'ip',
161 'access_level': constants.ACCESS_LEVEL_RW,
162 'access_to': x,
163 } for x in ips]
165 result = self.helper._get_access_entries_according_to_mapping(
166 {'ip': []})
167 self.assertEqual(expected, result)
169 def test_deny_access_to_data_service(self):
171 # mocks
172 self.mock_object(self.helper, '_change_data_access_to_instance')
174 # run
175 self.helper.deny_access_to_data_service(
176 [self.access], self.share_instance['id'])
178 # asserts
179 self.helper._change_data_access_to_instance.assert_called_once_with(
180 self.share_instance['id'], [self.access], deny=True)
182 @ddt.data(None, Exception('fake'))
183 def test_cleanup_data_access(self, exc):
185 # mocks
186 self.mock_object(self.helper, 'deny_access_to_data_service',
187 mock.Mock(side_effect=exc))
189 self.mock_object(data_copy_helper.LOG, 'warning')
191 # run
192 self.helper.cleanup_data_access([self.access],
193 self.share_instance['id'])
195 # asserts
196 self.helper.deny_access_to_data_service.assert_called_once_with(
197 [self.access], self.share_instance['id'])
199 if exc:
200 self.assertTrue(data_copy_helper.LOG.warning.called)
202 @ddt.data(False, True)
203 def test_cleanup_temp_folder(self, exc):
205 fake_path = ''.join(('/fake_path/', self.share_instance['id']))
207 # mocks
208 self.mock_object(os.path, 'exists',
209 mock.Mock(side_effect=[True, True, exc]))
210 self.mock_object(os, 'rmdir')
212 self.mock_object(data_copy_helper.LOG, 'warning')
214 # run
215 self.helper.cleanup_temp_folder(
216 '/fake_path/', self.share_instance['id'])
218 # asserts
219 os.rmdir.assert_called_once_with(fake_path)
220 os.path.exists.assert_has_calls([
221 mock.call(fake_path),
222 mock.call(fake_path),
223 mock.call(fake_path)
224 ])
226 if exc:
227 self.assertTrue(data_copy_helper.LOG.warning.called)
229 @ddt.data(None, Exception('fake'))
230 def test_cleanup_unmount_temp_folder(self, exc):
232 # mocks
233 self.mock_object(self.helper, 'unmount_share_instance_or_backup',
234 mock.Mock(side_effect=exc))
235 self.mock_object(data_copy_helper.LOG, 'warning')
237 unmount_info = {
238 'unmount': 'unmount_template',
239 'share_instance_id': self.share_instance['id']
240 }
241 # run
242 self.helper.cleanup_unmount_temp_folder(unmount_info, 'fake_path')
244 # asserts
245 self.helper.unmount_share_instance_or_backup.assert_called_once_with(
246 unmount_info, 'fake_path')
248 if exc:
249 self.assertTrue(data_copy_helper.LOG.warning.called)
251 @ddt.data(True, False)
252 def test__change_data_access_to_instance(self, deny):
253 access_rule = db_utils.create_access(share_id=self.share['id'])
254 access_rule = db.share_instance_access_get(
255 self.context, access_rule['id'], self.share_instance['id'])
257 # mocks
258 self.mock_object(share_rpc.ShareAPI, 'update_access')
259 self.mock_object(utils, 'wait_for_access_update')
260 mock_access_rules_status_update = self.mock_object(
261 self.helper.access_helper,
262 'get_and_update_share_instance_access_rules_status')
263 mock_rules_update = self.mock_object(
264 self.helper.access_helper,
265 'get_and_update_share_instance_access_rules')
267 # run
268 self.helper._change_data_access_to_instance(
269 self.share_instance, access_rule, deny=deny)
271 # asserts
272 if deny:
273 mock_rules_update.assert_called_once_with(
274 self.context, share_instance_id=self.share_instance['id'],
275 filters={'access_id': [access_rule['id']]},
276 updates={'state': constants.ACCESS_STATE_QUEUED_TO_DENY})
278 else:
279 self.assertFalse(mock_rules_update.called)
280 share_rpc.ShareAPI.update_access.assert_called_once_with(
281 self.context, self.share_instance)
282 mock_access_rules_status_update.assert_called_once_with(
283 self.context, status=constants.SHARE_INSTANCE_RULES_SYNCING,
284 share_instance_id=self.share_instance['id'])
285 utils.wait_for_access_update.assert_called_once_with(
286 self.context, self.helper.db, self.share_instance,
287 data_copy_helper.CONF.data_access_wait_access_rules_timeout)
289 @ddt.data('migration', 'backup', 'restore')
290 def test_mount_share_instance_or_backup(self, op):
292 # mocks
293 self.mock_object(utils, 'execute')
294 exists_calls = [False, True]
295 if op == 'backup':
296 exists_calls.extend([False, True])
297 if op == 'restore':
298 exists_calls.append([True])
299 self.mock_object(os.path, 'exists',
300 mock.Mock(side_effect=exists_calls))
301 self.mock_object(os, 'makedirs')
303 mount_info = {'mount': 'mount %(path)s'}
304 if op in ('backup', 'restore'):
305 fake_path = '/fake_backup_path/'
306 mount_info.update(
307 {'backup_id': 'fake_backup_id',
308 'mount_point': '/fake_backup_path/', op: True})
309 if op == 'migration':
310 share_instance_id = self.share_instance['id']
311 fake_path = ''.join(('/fake_path/', share_instance_id))
312 mount_info.update({'share_instance_id': share_instance_id})
314 # run
315 self.helper.mount_share_instance_or_backup(mount_info, '/fake_path')
317 # asserts
318 utils.execute.assert_called_once_with('mount', fake_path,
319 run_as_root=True)
321 if op == 'migration':
322 os.makedirs.assert_called_once_with(fake_path)
323 os.path.exists.assert_has_calls([
324 mock.call(fake_path),
325 mock.call(fake_path),
326 ])
327 if op == 'backup':
328 os.makedirs.assert_has_calls([
329 mock.call(fake_path),
330 mock.call(fake_path + 'fake_backup_id')
331 ])
332 os.path.exists.assert_has_calls([
333 mock.call(fake_path),
334 mock.call(fake_path),
335 mock.call(fake_path + 'fake_backup_id'),
336 mock.call(fake_path + 'fake_backup_id'),
337 ])
338 if op == 'restore':
339 os.makedirs.assert_called_once_with(fake_path)
340 os.path.exists.assert_has_calls([
341 mock.call(fake_path),
342 mock.call(fake_path),
343 mock.call(fake_path + 'fake_backup_id'),
344 ])
346 @ddt.data([True, True], [True, False], [True, Exception('fake')])
347 def test_unmount_share_instance_or_backup(self, side_effect):
349 fake_path = ''.join(('/fake_path/', self.share_instance['id']))
351 # mocks
352 self.mock_object(utils, 'execute')
353 self.mock_object(os.path, 'exists', mock.Mock(
354 side_effect=side_effect))
355 self.mock_object(os, 'rmdir')
356 self.mock_object(data_copy_helper.LOG, 'warning')
358 unmount_info = {
359 'unmount': 'unmount %(path)s',
360 'share_instance_id': self.share_instance['id']
361 }
363 # run
364 self.helper.unmount_share_instance_or_backup(
365 unmount_info, '/fake_path')
367 # asserts
368 utils.execute.assert_called_once_with('unmount', fake_path,
369 run_as_root=True)
370 os.rmdir.assert_called_once_with(fake_path)
371 os.path.exists.assert_has_calls([
372 mock.call(fake_path),
373 mock.call(fake_path),
374 ])
376 if any(isinstance(x, Exception) for x in side_effect):
377 self.assertTrue(data_copy_helper.LOG.warning.called)