Coverage for manila/tests/scheduler/test_rpcapi.py: 97%

52 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2012, Red Hat, Inc. 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); you may 

4# not use this file except in compliance with the License. You may obtain 

5# a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

12# License for the specific language governing permissions and limitations 

13# under the License. 

14 

15""" 

16Unit Tests for manila.scheduler.rpcapi 

17""" 

18 

19import copy 

20from unittest import mock 

21 

22from oslo_config import cfg 

23 

24from manila import context 

25from manila.scheduler import rpcapi as scheduler_rpcapi 

26from manila import test 

27 

28CONF = cfg.CONF 

29 

30 

31class SchedulerRpcAPITestCase(test.TestCase): 

32 

33 def tearDown(self): 

34 super(SchedulerRpcAPITestCase, self).tearDown() 

35 

36 def _test_scheduler_api(self, method, rpc_method, fanout=False, **kwargs): 

37 ctxt = context.RequestContext('fake_user', 'fake_project') 

38 rpcapi = scheduler_rpcapi.SchedulerAPI() 

39 expected_retval = 'foo' if method == 'call' else None 

40 

41 target = { 

42 "fanout": fanout, 

43 "version": kwargs.pop('version', '1.0'), 

44 } 

45 expected_msg = copy.deepcopy(kwargs) 

46 

47 self.fake_args = None 

48 self.fake_kwargs = None 

49 

50 def _fake_prepare_method(*args, **kwds): 

51 for kwd in kwds: 

52 self.assertEqual(target[kwd], kwds[kwd]) 

53 return rpcapi.client 

54 

55 def _fake_rpc_method(*args, **kwargs): 

56 self.fake_args = args 

57 self.fake_kwargs = kwargs 

58 if expected_retval: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 return expected_retval 

60 

61 with mock.patch.object(rpcapi.client, "prepare") as mock_prepared: 

62 mock_prepared.side_effect = _fake_prepare_method 

63 

64 with mock.patch.object(rpcapi.client, rpc_method) as mock_method: 

65 mock_method.side_effect = _fake_rpc_method 

66 retval = getattr(rpcapi, method)(ctxt, **kwargs) 

67 self.assertEqual(expected_retval, retval) 

68 expected_args = [ctxt, method, expected_msg] 

69 for arg, expected_arg in zip(self.fake_args, expected_args): 

70 self.assertEqual(expected_arg, arg) 

71 

72 def test_update_service_capabilities(self): 

73 self._test_scheduler_api('update_service_capabilities', 

74 rpc_method='cast', 

75 service_name='fake_name', 

76 host='fake_host', 

77 capabilities='fake_capabilities', 

78 fanout=True, 

79 version='1.10') 

80 

81 def test_create_share_instance(self): 

82 self._test_scheduler_api('create_share_instance', 

83 rpc_method='cast', 

84 request_spec='fake_request_spec', 

85 filter_properties='filter_properties', 

86 version='1.2') 

87 

88 def test_get_pools(self): 

89 self._test_scheduler_api('get_pools', 

90 rpc_method='call', 

91 filters=None, 

92 version='1.9') 

93 

94 def test_create_share_group(self): 

95 self._test_scheduler_api('create_share_group', 

96 rpc_method='cast', 

97 share_group_id='fake_share_group_id', 

98 request_spec='fake_request_spec', 

99 filter_properties='filter_properties', 

100 version='1.8') 

101 

102 def test_migrate_share_to_host(self): 

103 self._test_scheduler_api('migrate_share_to_host', 

104 rpc_method='cast', 

105 share_id='share_id', 

106 host='host', 

107 force_host_assisted_migration=True, 

108 preserve_metadata=True, 

109 writable=True, 

110 nondisruptive=False, 

111 preserve_snapshots=True, 

112 new_share_network_id='fake_net_id', 

113 new_share_type_id='fake_type_id', 

114 request_spec='fake_request_spec', 

115 filter_properties='filter_properties', 

116 version='1.7') 

117 

118 def test_create_share_replica(self): 

119 self._test_scheduler_api('create_share_replica', 

120 rpc_method='cast', 

121 request_spec='fake_request_spec', 

122 filter_properties='filter_properties', 

123 version='1.5') 

124 

125 def test_manage_share(self): 

126 self._test_scheduler_api('manage_share', 

127 rpc_method='cast', 

128 share_id='share_id', 

129 driver_options='fake_driver_options', 

130 request_spec='fake_request_spec', 

131 filter_properties='filter_properties', 

132 version='1.6') 

133 

134 def test_extend_share(self): 

135 self._test_scheduler_api('extend_share', 

136 rpc_method='cast', 

137 share_id='share_id', 

138 new_size='fake_size', 

139 reservations='fake_reservations', 

140 request_spec='fake_request_spec', 

141 filter_properties='filter_properties', 

142 version='1.11',)