Coverage for manila/api/views/limits.py: 94%

56 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright 2010-2011 OpenStack LLC. 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import datetime 

17 

18from manila.api import common 

19from manila import utils 

20 

21 

22class ViewBuilder(common.ViewBuilder): 

23 """OpenStack API base limits view builder.""" 

24 

25 _collection_name = "limits" 

26 _detail_version_modifiers = [ 

27 "add_share_replica_quotas", 

28 "add_share_group_quotas", 

29 "add_share_backup_quotas", 

30 ] 

31 

32 def build(self, request, rate_limits, absolute_limits): 

33 rate_limits = self._build_rate_limits(rate_limits) 

34 absolute_limits = self._build_absolute_limits(request, absolute_limits) 

35 

36 output = { 

37 "limits": { 

38 "rate": rate_limits, 

39 "absolute": absolute_limits, 

40 }, 

41 } 

42 

43 return output 

44 

45 def _build_absolute_limits(self, request, absolute_limits): 

46 """Builder for absolute limits. 

47 

48 absolute_limits should be given as a dict of limits. 

49 For example: {"limit": {"shares": 10, "gigabytes": 1024}, 

50 "in_use": {"shares": 8, "gigabytes": 256}}. 

51 """ 

52 limit_names = { 

53 "limit": { 

54 "gigabytes": ["maxTotalShareGigabytes"], 

55 "snapshot_gigabytes": ["maxTotalSnapshotGigabytes"], 

56 "shares": ["maxTotalShares"], 

57 "snapshots": ["maxTotalShareSnapshots"], 

58 "share_networks": ["maxTotalShareNetworks"], 

59 }, 

60 "in_use": { 

61 "shares": ["totalSharesUsed"], 

62 "snapshots": ["totalShareSnapshotsUsed"], 

63 "share_networks": ["totalShareNetworksUsed"], 

64 "gigabytes": ["totalShareGigabytesUsed"], 

65 "snapshot_gigabytes": ["totalSnapshotGigabytesUsed"], 

66 }, 

67 } 

68 limits = {} 

69 self.update_versioned_resource_dict(request, limit_names, 

70 absolute_limits) 

71 for mapping_key in limit_names.keys(): 

72 for k, v in absolute_limits.get(mapping_key, {}).items(): 

73 if k in limit_names.get(mapping_key, []) and v is not None: 

74 for name in limit_names[mapping_key][k]: 

75 limits[name] = v 

76 return limits 

77 

78 def _build_rate_limits(self, rate_limits): 

79 limits = [] 

80 for rate_limit in rate_limits: 

81 _rate_limit_key = None 

82 _rate_limit = self._build_rate_limit(rate_limit) 

83 

84 # check for existing key 

85 for limit in limits: 

86 if (limit["uri"] == rate_limit["URI"] and 

87 limit["regex"] == rate_limit["regex"]): 

88 _rate_limit_key = limit 

89 break 

90 

91 # ensure we have a key if we didn't find one 

92 if not _rate_limit_key: 

93 _rate_limit_key = { 

94 "uri": rate_limit["URI"], 

95 "regex": rate_limit["regex"], 

96 "limit": [], 

97 } 

98 limits.append(_rate_limit_key) 

99 

100 _rate_limit_key["limit"].append(_rate_limit) 

101 

102 return limits 

103 

104 def _build_rate_limit(self, rate_limit): 

105 next_avail = datetime.datetime.fromtimestamp( 

106 rate_limit["resetTime"], 

107 tz=datetime.timezone.utc).replace(tzinfo=None) 

108 return { 

109 "verb": rate_limit["verb"], 

110 "value": rate_limit["value"], 

111 "remaining": int(rate_limit["remaining"]), 

112 "unit": rate_limit["unit"], 

113 "next-available": utils.isotime(at=next_avail), 

114 } 

115 

116 @common.ViewBuilder.versioned_method("2.58") 

117 def add_share_group_quotas(self, request, limit_names, absolute_limits): 

118 limit_names["limit"]["share_groups"] = ["maxTotalShareGroups"] 

119 limit_names["limit"]["share_group_snapshots"] = ( 

120 ["maxTotalShareGroupSnapshots"]) 

121 limit_names["in_use"]["share_groups"] = ["totalShareGroupsUsed"] 

122 limit_names["in_use"]["share_group_snapshots"] = ( 

123 ["totalShareGroupSnapshotsUsed"]) 

124 

125 @common.ViewBuilder.versioned_method("2.53") 

126 def add_share_replica_quotas(self, request, limit_names, absolute_limits): 

127 limit_names["limit"]["share_replicas"] = ["maxTotalShareReplicas"] 

128 limit_names["limit"]["replica_gigabytes"] = ( 

129 ["maxTotalReplicaGigabytes"]) 

130 limit_names["in_use"]["share_replicas"] = ["totalShareReplicasUsed"] 

131 limit_names["in_use"]["replica_gigabytes"] = ( 

132 ["totalReplicaGigabytesUsed"]) 

133 

134 @common.ViewBuilder.versioned_method("2.80") 

135 def add_share_backup_quotas(self, request, limit_names, absolute_limits): 

136 limit_names["limit"]["backups"] = ["maxTotalShareBackups"] 

137 limit_names["limit"]["backup_gigabytes"] = ( 

138 ["maxTotalBackupGigabytes"]) 

139 limit_names["in_use"]["backups"] = ["totalShareBackupsUsed"] 

140 limit_names["in_use"]["backup_gigabytes"] = ( 

141 ["totalBackupGigabytesUsed"])