Coverage for manila/api/views/limits.py: 94%
56 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2010-2011 OpenStack LLC.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import datetime
18from manila.api import common
19from manila import utils
22class ViewBuilder(common.ViewBuilder):
23 """OpenStack API base limits view builder."""
25 _collection_name = "limits"
26 _detail_version_modifiers = [
27 "add_share_replica_quotas",
28 "add_share_group_quotas",
29 "add_share_backup_quotas",
30 ]
32 def build(self, request, rate_limits, absolute_limits):
33 rate_limits = self._build_rate_limits(rate_limits)
34 absolute_limits = self._build_absolute_limits(request, absolute_limits)
36 output = {
37 "limits": {
38 "rate": rate_limits,
39 "absolute": absolute_limits,
40 },
41 }
43 return output
45 def _build_absolute_limits(self, request, absolute_limits):
46 """Builder for absolute limits.
48 absolute_limits should be given as a dict of limits.
49 For example: {"limit": {"shares": 10, "gigabytes": 1024},
50 "in_use": {"shares": 8, "gigabytes": 256}}.
51 """
52 limit_names = {
53 "limit": {
54 "gigabytes": ["maxTotalShareGigabytes"],
55 "snapshot_gigabytes": ["maxTotalSnapshotGigabytes"],
56 "shares": ["maxTotalShares"],
57 "snapshots": ["maxTotalShareSnapshots"],
58 "share_networks": ["maxTotalShareNetworks"],
59 },
60 "in_use": {
61 "shares": ["totalSharesUsed"],
62 "snapshots": ["totalShareSnapshotsUsed"],
63 "share_networks": ["totalShareNetworksUsed"],
64 "gigabytes": ["totalShareGigabytesUsed"],
65 "snapshot_gigabytes": ["totalSnapshotGigabytesUsed"],
66 },
67 }
68 limits = {}
69 self.update_versioned_resource_dict(request, limit_names,
70 absolute_limits)
71 for mapping_key in limit_names.keys():
72 for k, v in absolute_limits.get(mapping_key, {}).items():
73 if k in limit_names.get(mapping_key, []) and v is not None:
74 for name in limit_names[mapping_key][k]:
75 limits[name] = v
76 return limits
78 def _build_rate_limits(self, rate_limits):
79 limits = []
80 for rate_limit in rate_limits:
81 _rate_limit_key = None
82 _rate_limit = self._build_rate_limit(rate_limit)
84 # check for existing key
85 for limit in limits:
86 if (limit["uri"] == rate_limit["URI"] and
87 limit["regex"] == rate_limit["regex"]):
88 _rate_limit_key = limit
89 break
91 # ensure we have a key if we didn't find one
92 if not _rate_limit_key:
93 _rate_limit_key = {
94 "uri": rate_limit["URI"],
95 "regex": rate_limit["regex"],
96 "limit": [],
97 }
98 limits.append(_rate_limit_key)
100 _rate_limit_key["limit"].append(_rate_limit)
102 return limits
104 def _build_rate_limit(self, rate_limit):
105 next_avail = datetime.datetime.fromtimestamp(
106 rate_limit["resetTime"],
107 tz=datetime.timezone.utc).replace(tzinfo=None)
108 return {
109 "verb": rate_limit["verb"],
110 "value": rate_limit["value"],
111 "remaining": int(rate_limit["remaining"]),
112 "unit": rate_limit["unit"],
113 "next-available": utils.isotime(at=next_avail),
114 }
116 @common.ViewBuilder.versioned_method("2.58")
117 def add_share_group_quotas(self, request, limit_names, absolute_limits):
118 limit_names["limit"]["share_groups"] = ["maxTotalShareGroups"]
119 limit_names["limit"]["share_group_snapshots"] = (
120 ["maxTotalShareGroupSnapshots"])
121 limit_names["in_use"]["share_groups"] = ["totalShareGroupsUsed"]
122 limit_names["in_use"]["share_group_snapshots"] = (
123 ["totalShareGroupSnapshotsUsed"])
125 @common.ViewBuilder.versioned_method("2.53")
126 def add_share_replica_quotas(self, request, limit_names, absolute_limits):
127 limit_names["limit"]["share_replicas"] = ["maxTotalShareReplicas"]
128 limit_names["limit"]["replica_gigabytes"] = (
129 ["maxTotalReplicaGigabytes"])
130 limit_names["in_use"]["share_replicas"] = ["totalShareReplicasUsed"]
131 limit_names["in_use"]["replica_gigabytes"] = (
132 ["totalReplicaGigabytesUsed"])
134 @common.ViewBuilder.versioned_method("2.80")
135 def add_share_backup_quotas(self, request, limit_names, absolute_limits):
136 limit_names["limit"]["backups"] = ["maxTotalShareBackups"]
137 limit_names["limit"]["backup_gigabytes"] = (
138 ["maxTotalBackupGigabytes"])
139 limit_names["in_use"]["backups"] = ["totalShareBackupsUsed"]
140 limit_names["in_use"]["backup_gigabytes"] = (
141 ["totalBackupGigabytesUsed"])