Coverage for manila/share/drivers/maprfs/maprfs_native.py: 97%
245 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2016, MapR Technologies
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""
17Share driver for MapR-FS distributed file system.
18"""
19import math
20import os
22from oslo_config import cfg
23from oslo_log import log
24from oslo_utils import strutils
25from oslo_utils import units
27from manila import context
28from manila import exception
29from manila.i18n import _
30from manila.share import api
31from manila.share import driver
33from manila.share.drivers.maprfs import driver_util as mapru
35LOG = log.getLogger(__name__)
37maprfs_native_share_opts = [
38 cfg.ListOpt('maprfs_clinode_ip',
39 help='The list of IPs or hostnames of nodes where mapr-core '
40 'is installed.'),
41 cfg.PortOpt('maprfs_ssh_port',
42 default=22,
43 help='CLDB node SSH port.'),
44 cfg.StrOpt('maprfs_ssh_name',
45 default="mapr",
46 help='Cluster admin user ssh login name.'),
47 cfg.StrOpt('maprfs_ssh_pw',
48 secret=True,
49 help='Cluster node SSH login password, '
50 'This parameter is not necessary, if '
51 '\'maprfs_ssh_private_key\' is configured.'),
52 cfg.StrOpt('maprfs_ssh_private_key',
53 help='Path to SSH private '
54 'key for login.'),
55 cfg.StrOpt('maprfs_base_volume_dir',
56 default='/',
57 help='Path in MapRFS where share volumes must be created.'),
58 cfg.ListOpt('maprfs_zookeeper_ip',
59 help='The list of IPs or hostnames of ZooKeeper nodes.'),
60 cfg.ListOpt('maprfs_cldb_ip',
61 help='The list of IPs or hostnames of CLDB nodes.'),
62 cfg.BoolOpt('maprfs_rename_managed_volume',
63 default=True,
64 help='Specify whether existing volume should be renamed when'
65 ' start managing.'),
66]
68CONF = cfg.CONF
69CONF.register_opts(maprfs_native_share_opts)
72class MapRFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver):
73 """MapR-FS Share Driver.
75 Executes commands relating to shares.
76 driver_handles_share_servers must be False because this driver does not
77 support creating or managing virtual storage servers (share servers)
78 API version history:
80 1.0 - Initial Version
81 """
83 def __init__(self, *args, **kwargs):
84 super(MapRFSNativeShareDriver, self).__init__(False, *args, **kwargs)
85 self.configuration.append_config_values(maprfs_native_share_opts)
86 self.backend_name = self.configuration.safe_get(
87 'share_backend_name') or 'MapR-FS-Native'
88 self._base_volume_dir = self.configuration.safe_get(
89 'maprfs_base_volume_dir') or '/'
90 self._maprfs_util = None
91 self._maprfs_base_path = "maprfs://"
92 self.cldb_ip = self.configuration.maprfs_cldb_ip or []
93 self.zookeeper_ip = self.configuration.maprfs_zookeeper_ip or []
94 self.rename_volume = self.configuration.maprfs_rename_managed_volume
95 self.api = api.API()
97 def do_setup(self, context):
98 """Do initialization while the share driver starts."""
99 super(MapRFSNativeShareDriver, self).do_setup(context)
100 self._maprfs_util = mapru.get_version_handler(self.configuration)
102 def _share_dir(self, share_name):
103 return os.path.join(self._base_volume_dir, share_name)
105 def _volume_name(self, share_name):
106 return share_name
108 def _get_share_path(self, share):
109 return share['export_location']
111 def _get_snapshot_path(self, snapshot):
112 share_dir = snapshot['share_instance']['export_location'].split(
113 ' ')[0][len(self._maprfs_base_path):]
114 return os.path.join(share_dir, '.snapshot',
115 snapshot['provider_location'] or snapshot['name'])
117 def _get_volume_name(self, context, share):
118 metadata = self.api.get_share_metadata(context,
119 {'id': share['share_id']})
120 return metadata.get('_name', self._volume_name(share['name']))
122 def _get_share_export_locations(self, share, path=None):
123 """Return share path on storage provider."""
124 cluster_name = self._maprfs_util.get_cluster_name()
125 path = '%(path)s -C %(cldb)s -Z %(zookeeper)s -N %(name)s' % {
126 'path': self._maprfs_base_path + (
127 path or self._share_dir(share['name'])),
128 'cldb': ' '.join(self.cldb_ip),
129 'zookeeper': ' '.join(self.zookeeper_ip),
130 'name': cluster_name
131 }
132 export_list = [{
133 "path": path,
134 "is_admin_only": False,
135 "metadata": {
136 "cldb": ','.join(self.cldb_ip),
137 "zookeeper": ','.join(self.zookeeper_ip),
138 "cluster-name": cluster_name,
139 },
140 }]
142 return export_list
144 def _create_share(self, share, metadata, context):
145 """Creates a share."""
146 if share['share_proto'].lower() != 'maprfs':
147 msg = _('Only MapRFS protocol supported!')
148 LOG.error(msg)
149 raise exception.MapRFSException(msg=msg)
150 options = {k[1:]: v for k, v in metadata.items() if k[0] == '_'}
151 share_dir = options.pop('path', self._share_dir(share['name']))
152 volume_name = options.pop('name', self._volume_name(share['name']))
153 try:
154 self._maprfs_util.create_volume(volume_name, share_dir,
155 share['size'],
156 **options)
157 # posix permissions should be 777, ACEs are used as a restriction
158 self._maprfs_util.maprfs_chmod(share_dir, '777')
159 except exception.ProcessExecutionError:
160 self.api.update_share_metadata(context,
161 {'id': share['share_id']},
162 {'_name': 'error'})
163 msg = (_('Failed to create volume in MapR-FS for the '
164 'share %(share_name)s.') % {'share_name': share['name']})
165 LOG.exception(msg)
166 raise exception.MapRFSException(msg=msg)
168 def _set_share_size(self, share, size):
169 volume_name = self._get_volume_name(context.get_admin_context(), share)
170 try:
171 if share['size'] > size:
172 info = self._maprfs_util.get_volume_info(volume_name)
173 used = info['totalused']
174 if int(used) >= int(size) * units.Ki: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true
175 raise exception.ShareShrinkingPossibleDataLoss(
176 share_id=share['id'])
177 self._maprfs_util.set_volume_size(volume_name, size)
178 except exception.ProcessExecutionError:
179 msg = (_('Failed to set space quota for the share %(share_name)s.')
180 % {'share_name': share['name']})
181 LOG.exception(msg)
182 raise exception.MapRFSException(msg=msg)
184 def get_network_allocations_number(self):
185 return 0
187 def create_share(self, context, share, share_server=None):
188 """Create a MapRFS volume which acts as a share."""
189 metadata = self.api.get_share_metadata(context,
190 {'id': share['share_id']})
191 self._create_share(share, metadata, context)
192 return self._get_share_export_locations(share,
193 path=metadata.get('_path'))
195 def ensure_share(self, context, share, share_server=None):
196 """Updates export location if it is changes."""
197 volume_name = self._get_volume_name(context, share)
198 if self._maprfs_util.volume_exists(volume_name): 198 ↛ 207line 198 didn't jump to line 207 because the condition on line 198 was always true
199 info = self._maprfs_util.get_volume_info(volume_name)
200 path = info['mountdir']
201 old_location = share['export_locations'][0]
202 new_location = self._get_share_export_locations(
203 share, path=path)
204 if new_location[0]['path'] != old_location['path']: 204 ↛ exitline 204 didn't return from function 'ensure_share' because the condition on line 204 was always true
205 return new_location
206 else:
207 raise exception.ShareResourceNotFound(share_id=share['share_id'])
209 def create_share_from_snapshot(self, context, share, snapshot,
210 share_server=None, parent_share=None):
211 """Creates a share from snapshot."""
212 metadata = self.api.get_share_metadata(context,
213 {'id': share['share_id']})
214 sn_share_tenant = self.api.get_share_metadata(context, {
215 'id': snapshot['share_instance']['share_id']}).get('_tenantuser')
216 if sn_share_tenant and sn_share_tenant != metadata.get('_tenantuser'):
217 msg = (
218 _('Cannot create share from snapshot %(snapshot_name)s '
219 'with name %(share_name)s. Error: Tenant user should not '
220 'differ from tenant of the source snapshot.') %
221 {'snapshot_name': snapshot['name'],
222 'share_name': share['name']})
223 LOG.error(msg)
224 raise exception.MapRFSException(msg=msg)
225 share_dir = metadata.get('_path', self._share_dir(share['name']))
226 snapshot_path = self._get_snapshot_path(snapshot)
227 self._create_share(share, metadata, context)
229 try:
230 if self._maprfs_util.dir_not_empty(snapshot_path): 230 ↛ 240line 230 didn't jump to line 240 because the condition on line 230 was always true
231 self._maprfs_util.maprfs_cp(snapshot_path + '/*', share_dir)
232 except exception.ProcessExecutionError:
233 msg = (
234 _('Failed to create share from snapshot %(snapshot_name)s '
235 'with name %(share_name)s.') % {
236 'snapshot_name': snapshot['name'],
237 'share_name': share['name']})
238 LOG.exception(msg)
239 raise exception.MapRFSException(msg=msg)
240 return self._get_share_export_locations(share,
241 path=metadata.get('_path'))
243 def create_snapshot(self, context, snapshot, share_server=None):
244 """Creates a snapshot."""
245 volume_name = self._get_volume_name(context, snapshot['share'])
246 snapshot_name = snapshot['name']
247 try:
248 self._maprfs_util.create_snapshot(snapshot_name, volume_name)
249 return {'provider_location': snapshot_name}
250 except exception.ProcessExecutionError:
251 msg = (
252 _('Failed to create snapshot %(snapshot_name)s for the share '
253 '%(share_name)s.') % {'snapshot_name': snapshot_name,
254 'share_name': snapshot['share_name']})
255 LOG.exception(msg)
256 raise exception.MapRFSException(msg=msg)
258 def delete_share(self, context, share, share_server=None):
259 """Deletes share storage."""
260 volume_name = self._get_volume_name(context, share)
261 if volume_name == "error":
262 LOG.info("Skipping deleting share with name %s, as it does not"
263 " exist on the backend", share['name'])
264 return
265 try:
266 self._maprfs_util.delete_volume(volume_name)
267 except exception.ProcessExecutionError:
268 msg = (_('Failed to delete share %(share_name)s.') %
269 {'share_name': share['name']})
270 LOG.exception(msg)
271 raise exception.MapRFSException(msg=msg)
273 def delete_snapshot(self, context, snapshot, share_server=None):
274 """Deletes a snapshot."""
275 snapshot_name = snapshot['provider_location'] or snapshot['name']
276 volume_name = self._get_volume_name(context, snapshot['share'])
277 try:
278 self._maprfs_util.delete_snapshot(snapshot_name, volume_name)
279 except exception.ProcessExecutionError:
280 msg = (_('Failed to delete snapshot %(snapshot_name)s.') %
281 {'snapshot_name': snapshot['name']})
282 LOG.exception(msg)
283 raise exception.MapRFSException(msg=msg)
285 def update_access(self, context, share, access_rules, add_rules,
286 delete_rules, update_rules, share_server=None):
287 """Update access rules for given share."""
288 for access in access_rules:
289 if access['access_type'].lower() != 'user':
290 msg = _("Only 'user' access type allowed!")
291 LOG.error(msg)
292 raise exception.InvalidShareAccess(reason=msg)
293 volume_name = self._get_volume_name(context, share)
294 try:
295 # 'update_access' is called before share is removed, so this
296 # method shouldn`t raise exception if share does
297 # not exist actually
298 if not self._maprfs_util.volume_exists(volume_name):
299 LOG.warning('Can not get share %s.', share['name'])
300 return
301 # check update
302 if add_rules or delete_rules:
303 self._maprfs_util.remove_volume_ace_rules(volume_name,
304 delete_rules)
305 self._maprfs_util.add_volume_ace_rules(volume_name, add_rules)
306 else:
307 self._maprfs_util.set_volume_ace(volume_name, access_rules)
308 except exception.ProcessExecutionError:
309 msg = (_('Failed to update access for share %(name)s.') %
310 {'name': share['name']})
311 LOG.exception(msg)
312 raise exception.MapRFSException(msg=msg)
314 def extend_share(self, share, new_size, share_server=None):
315 """Extend share storage."""
316 self._set_share_size(share, new_size)
318 def shrink_share(self, share, new_size, share_server=None):
319 """Shrink share storage."""
320 self._set_share_size(share, new_size)
322 def _check_maprfs_state(self):
323 try:
324 return self._maprfs_util.check_state()
325 except exception.ProcessExecutionError:
326 msg = _('Failed to check MapRFS state.')
327 LOG.exception(msg)
328 raise exception.MapRFSException(msg=msg)
330 def check_for_setup_error(self):
331 """Return an error if the prerequisites are not met."""
332 if not self.configuration.maprfs_clinode_ip:
333 msg = _(
334 'MapR cluster has not been specified in the configuration. '
335 'Add the ip or list of ip of nodes with mapr-core installed '
336 'in the "maprfs_clinode_ip" configuration parameter.')
337 LOG.error(msg)
338 raise exception.MapRFSException(msg=msg)
340 if not self.configuration.maprfs_cldb_ip: 340 ↛ 343line 340 didn't jump to line 343 because the condition on line 340 was always true
341 LOG.warning('CLDB nodes are not specified!')
343 if not self.configuration.maprfs_zookeeper_ip: 343 ↛ 346line 343 didn't jump to line 346 because the condition on line 343 was always true
344 LOG.warning('Zookeeper nodes are not specified!')
346 if not self._check_maprfs_state():
347 msg = _('MapR-FS is not in healthy state.')
348 LOG.error(msg)
349 raise exception.MapRFSException(msg=msg)
350 try:
351 self._maprfs_util.maprfs_ls(
352 os.path.join(self._base_volume_dir, ''))
353 except exception.ProcessExecutionError:
354 msg = _('Invalid "maprfs_base_volume_name". No such directory.')
355 LOG.exception(msg)
356 raise exception.MapRFSException(msg=msg)
358 def manage_existing(self, share, driver_options):
359 try:
360 # retrieve share path from export location, maprfs:// prefix and
361 # metadata (-C -Z -N) should be casted away
362 share_path = share['export_location'].split(
363 )[0][len(self._maprfs_base_path):]
364 info = self._maprfs_util.get_volume_info_by_path(
365 share_path, check_if_exists=True)
366 if not info:
367 msg = _("Share %s not found") % share[
368 'export_location']
369 LOG.error(msg)
370 raise exception.ManageInvalidShare(reason=msg)
371 size = math.ceil(float(info['quota']) / units.Ki)
372 used = math.ceil(float(info['totalused']) / units.Ki)
373 volume_name = info['volumename']
374 should_rename = self.rename_volume
375 rename_option = driver_options.get('rename')
376 if rename_option:
377 should_rename = strutils.bool_from_string(rename_option)
378 if should_rename:
379 self._maprfs_util.rename_volume(volume_name, share['name'])
380 else:
381 self.api.update_share_metadata(context.get_admin_context(),
382 {'id': share['share_id']},
383 {'_name': volume_name})
384 location = self._get_share_export_locations(share, path=share_path)
385 if size == 0:
386 size = used
387 msg = (
388 'Share %s has no size quota. Total used value will be'
389 ' used as share size')
390 LOG.warning(msg, share['name'])
391 return {'size': size, 'export_locations': location}
392 except (ValueError, KeyError, exception.ProcessExecutionError):
393 msg = _('Failed to manage share.')
394 LOG.exception(msg)
395 raise exception.MapRFSException(msg=msg)
397 def manage_existing_snapshot(self, snapshot, driver_options):
398 volume_name = self._get_volume_name(context.get_admin_context(),
399 snapshot['share'])
400 snapshot_path = self._get_snapshot_path(snapshot)
401 try:
402 snapshot_list = self._maprfs_util.get_snapshot_list(
403 volume_name=volume_name)
404 snapshot_name = snapshot['provider_location']
405 if snapshot_name not in snapshot_list:
406 msg = _("Snapshot %s not found") % snapshot_name
407 LOG.error(msg)
408 raise exception.ManageInvalidShareSnapshot(reason=msg)
409 size = math.ceil(float(self._maprfs_util.maprfs_du(
410 snapshot_path)) / units.Gi)
411 return {'size': size}
412 except exception.ProcessExecutionError:
413 msg = _("Manage existing share snapshot failed.")
414 LOG.exception(msg)
415 raise exception.MapRFSException(msg=msg)
417 def _update_share_stats(self):
418 """Retrieves stats info of share directories group."""
419 try:
420 total, free = self._maprfs_util.fs_capacity()
421 except exception.ProcessExecutionError:
422 msg = _('Failed to check MapRFS capacity info.')
423 LOG.exception(msg)
424 raise exception.MapRFSException(msg=msg)
425 total_capacity_gb = int(math.ceil(float(total) / units.Gi))
426 free_capacity_gb = int(math.floor(float(free) / units.Gi))
427 data = {
428 'share_backend_name': self.backend_name,
429 'storage_protocol': 'MAPRFS',
430 'driver_handles_share_servers': self.driver_handles_share_servers,
431 'vendor_name': 'MapR Technologies',
432 'driver_version': '1.0',
433 'total_capacity_gb': total_capacity_gb,
434 'free_capacity_gb': free_capacity_gb,
435 'snapshot_support': True,
436 'create_share_from_snapshot_support': True,
437 }
439 super(MapRFSNativeShareDriver, self)._update_share_stats(data)