Coverage for manila/tests/data/test_rpcapi.py: 97%
56 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2015, Hitachi Data Systems.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15"""
16Unit Tests for manila.data.rpcapi
17"""
19import copy
20from unittest import mock
22from oslo_config import cfg
23from oslo_serialization import jsonutils
25from manila.common import constants
26from manila import context
27from manila.data import rpcapi as data_rpcapi
28from manila import test
29from manila.tests import db_utils
31CONF = cfg.CONF
34class DataRpcAPITestCase(test.TestCase):
36 def setUp(self):
37 super(DataRpcAPITestCase, self).setUp()
38 share = db_utils.create_share(
39 availability_zone=CONF.storage_availability_zone,
40 status=constants.STATUS_AVAILABLE
41 )
42 self.fake_share = jsonutils.to_primitive(share)
43 self.backup = db_utils.create_backup(
44 share_id=self.fake_share['id'], status=constants.STATUS_AVAILABLE)
46 def tearDown(self):
47 super(DataRpcAPITestCase, self).tearDown()
49 def _test_data_api(self, method, rpc_method, fanout=False, **kwargs):
50 ctxt = context.RequestContext('fake_user', 'fake_project')
51 rpcapi = data_rpcapi.DataAPI()
52 expected_retval = 'foo' if method == 'call' else None
54 target = {
55 "fanout": fanout,
56 "version": kwargs.pop('version', '1.0'),
57 }
58 expected_msg = copy.deepcopy(kwargs)
60 self.fake_args = None
61 self.fake_kwargs = None
63 def _fake_prepare_method(*args, **kwds):
64 for kwd in kwds:
65 self.assertEqual(target[kwd], kwds[kwd])
66 return rpcapi.client
68 def _fake_rpc_method(*args, **kwargs):
69 self.fake_args = args
70 self.fake_kwargs = kwargs
71 if expected_retval: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true
72 return expected_retval
74 with mock.patch.object(rpcapi.client, "prepare") as mock_prepared:
75 mock_prepared.side_effect = _fake_prepare_method
77 with mock.patch.object(rpcapi.client, rpc_method) as mock_method:
78 mock_method.side_effect = _fake_rpc_method
79 retval = getattr(rpcapi, method)(ctxt, **kwargs)
80 self.assertEqual(expected_retval, retval)
81 expected_args = [ctxt, method, expected_msg]
82 for arg, expected_arg in zip(self.fake_args, expected_args):
83 self.assertEqual(expected_arg, arg)
85 def test_migration_start(self):
86 self._test_data_api('migration_start',
87 rpc_method='cast',
88 version='1.0',
89 share_id=self.fake_share['id'],
90 ignore_list=[],
91 share_instance_id='fake_ins_id',
92 dest_share_instance_id='dest_fake_ins_id',
93 connection_info_src={},
94 connection_info_dest={})
96 def test_data_copy_cancel(self):
97 self._test_data_api('data_copy_cancel',
98 rpc_method='call',
99 version='1.0',
100 share_id=self.fake_share['id'])
102 def test_data_copy_get_progress(self):
103 self._test_data_api('data_copy_get_progress',
104 rpc_method='call',
105 version='1.0',
106 share_id=self.fake_share['id'])
108 def test_create_backup(self):
109 self._test_data_api('create_backup',
110 rpc_method='cast',
111 version='1.1',
112 backup=self.backup)
114 def test_delete_backup(self):
115 self._test_data_api('delete_backup',
116 rpc_method='cast',
117 version='1.1',
118 backup=self.backup)
120 def test_restore_backup(self):
121 self._test_data_api('restore_backup',
122 rpc_method='cast',
123 version='1.1',
124 backup=self.backup,
125 share_id=self.fake_share['id'])