Coverage for manila/tests/scheduler/test_rpcapi.py: 97%
52 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2012, Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15"""
16Unit Tests for manila.scheduler.rpcapi
17"""
19import copy
20from unittest import mock
22from oslo_config import cfg
24from manila import context
25from manila.scheduler import rpcapi as scheduler_rpcapi
26from manila import test
28CONF = cfg.CONF
31class SchedulerRpcAPITestCase(test.TestCase):
33 def tearDown(self):
34 super(SchedulerRpcAPITestCase, self).tearDown()
36 def _test_scheduler_api(self, method, rpc_method, fanout=False, **kwargs):
37 ctxt = context.RequestContext('fake_user', 'fake_project')
38 rpcapi = scheduler_rpcapi.SchedulerAPI()
39 expected_retval = 'foo' if method == 'call' else None
41 target = {
42 "fanout": fanout,
43 "version": kwargs.pop('version', '1.0'),
44 }
45 expected_msg = copy.deepcopy(kwargs)
47 self.fake_args = None
48 self.fake_kwargs = None
50 def _fake_prepare_method(*args, **kwds):
51 for kwd in kwds:
52 self.assertEqual(target[kwd], kwds[kwd])
53 return rpcapi.client
55 def _fake_rpc_method(*args, **kwargs):
56 self.fake_args = args
57 self.fake_kwargs = kwargs
58 if expected_retval: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 return expected_retval
61 with mock.patch.object(rpcapi.client, "prepare") as mock_prepared:
62 mock_prepared.side_effect = _fake_prepare_method
64 with mock.patch.object(rpcapi.client, rpc_method) as mock_method:
65 mock_method.side_effect = _fake_rpc_method
66 retval = getattr(rpcapi, method)(ctxt, **kwargs)
67 self.assertEqual(expected_retval, retval)
68 expected_args = [ctxt, method, expected_msg]
69 for arg, expected_arg in zip(self.fake_args, expected_args):
70 self.assertEqual(expected_arg, arg)
72 def test_update_service_capabilities(self):
73 self._test_scheduler_api('update_service_capabilities',
74 rpc_method='cast',
75 service_name='fake_name',
76 host='fake_host',
77 capabilities='fake_capabilities',
78 fanout=True,
79 version='1.10')
81 def test_create_share_instance(self):
82 self._test_scheduler_api('create_share_instance',
83 rpc_method='cast',
84 request_spec='fake_request_spec',
85 filter_properties='filter_properties',
86 version='1.2')
88 def test_get_pools(self):
89 self._test_scheduler_api('get_pools',
90 rpc_method='call',
91 filters=None,
92 version='1.9')
94 def test_create_share_group(self):
95 self._test_scheduler_api('create_share_group',
96 rpc_method='cast',
97 share_group_id='fake_share_group_id',
98 request_spec='fake_request_spec',
99 filter_properties='filter_properties',
100 version='1.8')
102 def test_migrate_share_to_host(self):
103 self._test_scheduler_api('migrate_share_to_host',
104 rpc_method='cast',
105 share_id='share_id',
106 host='host',
107 force_host_assisted_migration=True,
108 preserve_metadata=True,
109 writable=True,
110 nondisruptive=False,
111 preserve_snapshots=True,
112 new_share_network_id='fake_net_id',
113 new_share_type_id='fake_type_id',
114 request_spec='fake_request_spec',
115 filter_properties='filter_properties',
116 version='1.7')
118 def test_create_share_replica(self):
119 self._test_scheduler_api('create_share_replica',
120 rpc_method='cast',
121 request_spec='fake_request_spec',
122 filter_properties='filter_properties',
123 version='1.5')
125 def test_manage_share(self):
126 self._test_scheduler_api('manage_share',
127 rpc_method='cast',
128 share_id='share_id',
129 driver_options='fake_driver_options',
130 request_spec='fake_request_spec',
131 filter_properties='filter_properties',
132 version='1.6')
134 def test_extend_share(self):
135 self._test_scheduler_api('extend_share',
136 rpc_method='cast',
137 share_id='share_id',
138 new_size='fake_size',
139 reservations='fake_reservations',
140 request_spec='fake_request_spec',
141 filter_properties='filter_properties',
142 version='1.11',)