Coverage for manila/tests/fake_notifier.py: 92%
30 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2014 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15import collections
16import functools
18import oslo_messaging as messaging
19from oslo_serialization import jsonutils
21from manila import rpc
23NOTIFICATIONS = []
26def reset():
27 del NOTIFICATIONS[:]
30FakeMessage = collections.namedtuple(
31 'Message',
32 ['publisher_id', 'priority', 'event_type', 'payload'],
33)
36class FakeNotifier(object):
38 def __init__(self, transport, publisher_id=None, serializer=None):
39 self.transport = transport
40 self.publisher_id = publisher_id
41 for priority in ['debug', 'info', 'warn', 'error', 'critical']:
42 setattr(self, priority,
43 functools.partial(self._notify, priority.upper()))
44 self._serializer = serializer or messaging.serializer.NoOpSerializer()
46 def prepare(self, publisher_id=None):
47 if publisher_id is None: 47 ↛ 48line 47 didn't jump to line 48 because the condition on line 47 was never true
48 publisher_id = self.publisher_id
49 return self.__class__(self.transport, publisher_id, self._serializer)
51 def _notify(self, priority, ctxt, event_type, payload):
52 payload = self._serializer.serialize_entity(ctxt, payload)
53 # NOTE(sileht): simulate the kombu serializer
54 # this permit to raise an exception if something have not
55 # been serialized correctly
56 jsonutils.to_primitive(payload)
57 msg = dict(publisher_id=self.publisher_id,
58 priority=priority,
59 event_type=event_type,
60 payload=payload)
61 NOTIFICATIONS.append(msg)
64def stub_notifier(testcase):
65 testcase.mock_object(messaging, 'Notifier', FakeNotifier)
66 if rpc.NOTIFIER: 66 ↛ exitline 66 didn't return from function 'stub_notifier' because the condition on line 66 was always true
67 serializer = getattr(rpc.NOTIFIER, '_serializer', None)
68 testcase.mock_object(rpc, 'NOTIFIER',
69 FakeNotifier(rpc.NOTIFIER.transport,
70 rpc.NOTIFIER.publisher_id,
71 serializer=serializer))