Coverage for manila/share/drivers/cephfs/driver.py: 79%
738 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:39 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:39 +0000
1# Copyright (c) 2016 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
17import ipaddress
18import json
19import math
20import re
21import socket
22import sys
24from oslo_config import cfg
25from oslo_config import types
26from oslo_log import log
27from oslo_utils import importutils
28from oslo_utils import timeutils
29from oslo_utils import units
31from manila.common import constants
32from manila import exception
33from manila.i18n import _
34from manila.message import api as message_api
35from manila.message import message_field
36from manila.share import driver
37from manila.share.drivers import ganesha
38from manila.share.drivers.ganesha import utils as ganesha_utils
39from manila.share.drivers import helpers as driver_helpers
41rados = None
42json_command = None
43ceph_default_target = None
46def setup_rados():
47 global rados
48 if not rados:
49 try:
50 rados = importutils.import_module('rados')
51 except ImportError:
52 raise exception.ShareBackendException(
53 _("rados python module is not installed"))
56def setup_json_command():
57 global json_command
58 if not json_command:
59 try:
60 json_command = importutils.import_class(
61 'ceph_argparse.json_command')
62 except ImportError:
63 raise exception.ShareBackendException(
64 _("ceph_argparse python module is not installed"))
67CEPHX_ACCESS_TYPE = "cephx"
69# The default Ceph administrative identity
70CEPH_DEFAULT_AUTH_ID = "admin"
72DEFAULT_VOLUME_MODE = '755'
74RADOS_TIMEOUT = 10
76LOG = log.getLogger(__name__)
78# Clone statuses
79CLONE_CREATING = 'creating'
80CLONE_FAILED = 'failed'
81CLONE_CANCELED = 'canceled'
82CLONE_PENDING = 'pending'
83CLONE_INPROGRESS = 'in-progress'
84CLONE_COMPLETE = 'complete'
86cephfs_opts = [
87 cfg.StrOpt('cephfs_conf_path',
88 default="",
89 help="Fully qualified path to the ceph.conf file."),
90 cfg.StrOpt('cephfs_cluster_name',
91 help="The name of the cluster in use, if it is not "
92 "the default ('ceph')."
93 ),
94 cfg.StrOpt('cephfs_auth_id',
95 default="manila",
96 help="The name of the ceph auth identity to use."
97 ),
98 cfg.StrOpt('cephfs_volume_path_prefix',
99 deprecated_for_removal=True,
100 deprecated_since='Wallaby',
101 deprecated_reason='This option is not used starting with '
102 'the Nautilus release of Ceph.',
103 default="/volumes",
104 help="The prefix of the cephfs volume path."
105 ),
106 cfg.StrOpt('cephfs_protocol_helper_type',
107 default="CEPHFS",
108 choices=['CEPHFS', 'NFS'],
109 ignore_case=True,
110 help="The type of protocol helper to use. Default is "
111 "CEPHFS."
112 ),
113 cfg.BoolOpt('cephfs_ganesha_server_is_remote',
114 default=False,
115 help="Whether the NFS-Ganesha server is remote to the driver.",
116 deprecated_for_removal=True,
117 deprecated_since='2025.1',
118 deprecated_reason="This option is used by the deprecated "
119 "NFSProtocolHelper"),
120 cfg.HostAddressOpt('cephfs_ganesha_server_ip',
121 help="The IP address of the NFS-Ganesha server."),
122 cfg.StrOpt('cephfs_ganesha_server_username',
123 default='root',
124 help="The username to authenticate as in the remote "
125 "NFS-Ganesha server host.",
126 deprecated_for_removal=True,
127 deprecated_since='2025.1',
128 deprecated_reason="This option is used by the deprecated "
129 "NFSProtocolHelper"),
130 cfg.StrOpt('cephfs_ganesha_path_to_private_key',
131 help="The path of the driver host's private SSH key file.",
132 deprecated_for_removal=True,
133 deprecated_since='2025.1',
134 deprecated_reason="This option is used by the deprecated "
135 "NFSProtocolHelper"),
136 cfg.StrOpt('cephfs_ganesha_server_password',
137 secret=True,
138 help="The password to authenticate as the user in the remote "
139 "Ganesha server host. This is not required if "
140 "'cephfs_ganesha_path_to_private_key' is configured.",
141 deprecated_for_removal=True,
142 deprecated_since='2025.1',
143 deprecated_reason="This option is used by the deprecated "
144 "NFSProtocolHelper"),
145 cfg.ListOpt('cephfs_ganesha_export_ips',
146 default=[],
147 help="List of IPs to export shares. If not supplied, "
148 "then the value of 'cephfs_ganesha_server_ip' "
149 "will be used to construct share export locations."),
150 cfg.StrOpt('cephfs_volume_mode',
151 default=DEFAULT_VOLUME_MODE,
152 help="The read/write/execute permissions mode for CephFS "
153 "volumes, snapshots, and snapshot groups expressed in "
154 "Octal as with linux 'chmod' or 'umask' commands."),
155 cfg.StrOpt('cephfs_filesystem_name',
156 help="The name of the filesystem to use, if there are "
157 "multiple filesystems in the cluster."),
158 cfg.StrOpt('cephfs_ensure_all_shares_salt',
159 default="manila_cephfs_reef_caracal",
160 help="Provide a unique string value to make the driver "
161 "ensure all of the shares it has created during "
162 "startup. Ensuring would re-export shares and this "
163 "action isn't always required, unless something has "
164 "been administratively modified on CephFS."),
165 cfg.IntOpt('cephfs_cached_allocated_capacity_update_interval',
166 min=0,
167 default=60,
168 help="The maximum time in seconds that the cached pool "
169 "data will be considered updated. If it is expired when "
170 "trying to read the pool data, it must be refreshed.")
171]
173cephfsnfs_opts = [
174 cfg.StrOpt('cephfs_nfs_cluster_id',
175 help="The ID of the NFS cluster to use."),
176]
179CONF = cfg.CONF
180CONF.register_opts(cephfs_opts)
181CONF.register_opts(cephfsnfs_opts)
184class RadosError(Exception):
185 """Something went wrong talking to Ceph with librados"""
187 pass
190class AllocationCapacityCache(object):
191 """AllocationCapacityCache for CephFS filesystems.
193 The cache validity is measured by a stop watch that is
194 not thread-safe.
195 """
197 def __init__(self, duration):
198 self._stop_watch = timeutils.StopWatch(duration)
199 self._cached_allocated_capacity = None
201 def is_expired(self):
202 return not self._stop_watch.has_started() or self._stop_watch.expired()
204 def get_data(self):
205 return self._cached_allocated_capacity
207 def update_data(self, cached_allocated_capacity):
208 if not self._stop_watch.has_started(): 208 ↛ 211line 208 didn't jump to line 211 because the condition on line 208 was always true
209 self._stop_watch.start()
210 else:
211 self._stop_watch.restart()
213 self._cached_allocated_capacity = cached_allocated_capacity
216def rados_command(rados_client, prefix=None, args=None,
217 json_obj=False, target=None, inbuf=None):
218 """Safer wrapper for ceph_argparse.json_command
220 Raises error exception instead of relying on caller to check return
221 codes.
223 Error exception can result from:
224 * Timeout
225 * Actual legitimate errors
226 * Malformed JSON output
228 return: If json_obj is True, return the decoded JSON object from ceph,
229 or None if empty string returned.
230 If json is False, return a decoded string (the data returned by
231 ceph command)
232 """
234 target = target or ceph_default_target
236 if args is None:
237 args = {}
239 argdict = args.copy()
240 argdict['format'] = 'json'
242 if inbuf is None:
243 inbuf = b''
245 LOG.debug("Invoking ceph_argparse.json_command - rados_client=%(cl)s, "
246 "target=%(tg)s, prefix='%(pf)s', argdict=%(ad)s, inbuf=%(ib)s, "
247 "timeout=%(to)s.",
248 {"cl": rados_client, "tg": target, "pf": prefix, "ad": argdict,
249 "ib": inbuf, "to": RADOS_TIMEOUT})
251 try:
252 ret, outbuf, outs = json_command(rados_client,
253 target=target,
254 prefix=prefix,
255 argdict=argdict,
256 inbuf=inbuf,
257 timeout=RADOS_TIMEOUT)
258 if ret != 0:
259 raise rados.Error(outs, ret)
260 if not json_obj:
261 result = outbuf.decode().strip()
262 else:
263 if outbuf:
264 result = json.loads(outbuf.decode().strip())
265 else:
266 result = None
267 except Exception as e:
268 msg = _("json_command failed - prefix=%(pfx)s, argdict=%(ad)s - "
269 "exception message: %(ex)s." %
270 {"pfx": prefix, "ad": argdict, "ex": e})
271 raise exception.ShareBackendException(msg)
273 return result
276class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
277 driver.ShareDriver):
278 """Driver for the Ceph Filesystem."""
280 def __init__(self, *args, **kwargs):
281 super(CephFSDriver, self).__init__(False, *args, **kwargs)
282 self.backend_name = self.configuration.safe_get(
283 'share_backend_name') or 'CephFS'
285 setup_rados()
286 setup_json_command()
287 self._rados_client = None
288 # name of the filesystem/volume used by the driver
289 self._volname = None
290 self._ceph_mon_version = None
291 self.configuration.append_config_values(cephfs_opts)
292 self.configuration.append_config_values(cephfsnfs_opts)
293 self._cached_allocated_capacity_gb = None
294 self.private_storage = kwargs.get('private_storage')
296 try:
297 int(self.configuration.cephfs_volume_mode, 8)
298 except ValueError:
299 msg = _("Invalid CephFS volume mode %s")
300 raise exception.BadConfigurationException(
301 msg % self.configuration.cephfs_volume_mode)
303 self._cephfs_volume_mode = self.configuration.cephfs_volume_mode
304 self.ipv6_implemented = True
306 def do_setup(self, context):
307 if self.configuration.cephfs_protocol_helper_type.upper() == "CEPHFS":
308 protocol_helper_class = getattr(
309 sys.modules[__name__], 'NativeProtocolHelper')
310 else:
311 # FIXME(vkmc) we intent to replace NFSProtocolHelper
312 # with NFSClusterProtocolHelper helper in BB/CC release
313 if self.configuration.cephfs_nfs_cluster_id is None:
314 protocol_helper_class = getattr(
315 sys.modules[__name__], 'NFSProtocolHelper')
316 else:
317 protocol_helper_class = getattr(
318 sys.modules[__name__], 'NFSClusterProtocolHelper')
320 self.setup_default_ceph_cmd_target()
322 self.protocol_helper = protocol_helper_class(
323 self._execute,
324 self.configuration,
325 rados_client=self.rados_client,
326 volname=self.volname)
328 self.protocol_helper.init_helper()
329 allocation_capacity_gb = self._get_cephfs_filesystem_allocation()
330 self._cached_allocated_capacity_gb = AllocationCapacityCache(
331 self.configuration.cephfs_cached_allocated_capacity_update_interval
332 )
333 self._cached_allocated_capacity_gb.update_data(allocation_capacity_gb)
335 def check_for_setup_error(self):
336 """Returns an error if prerequisites aren't met."""
337 self.protocol_helper.check_for_setup_error()
339 def _get_cephfs_filesystem_allocation(self):
340 allocated_capacity_gb = 0
341 argdict = {"vol_name": self.volname}
342 subvolumes = rados_command(
343 self.rados_client, "fs subvolume ls", argdict, json_obj=True)
344 for sub_vol in subvolumes:
345 argdict = {"vol_name": self.volname, "sub_name": sub_vol["name"]}
346 sub_info = rados_command(
347 self.rados_client, "fs subvolume info", argdict, json_obj=True)
348 size = sub_info.get('bytes_quota', 0)
349 if size == "infinite":
350 # If we have a share that has infinite quota, we should not
351 # add that to the allocated capacity as that would make the
352 # scheduler think this backend is full.
353 continue
354 allocated_capacity_gb += round(int(size) / units.Gi, 2)
355 return allocated_capacity_gb
357 def _update_share_stats(self):
358 stats = self.rados_client.get_cluster_stats()
360 total_capacity_gb = round(stats['kb'] / units.Mi, 2)
361 free_capacity_gb = round(stats['kb_avail'] / units.Mi, 2)
362 if self._cached_allocated_capacity_gb.is_expired():
363 allocated_capacity_gb = self._get_cephfs_filesystem_allocation()
364 self._cached_allocated_capacity_gb.update_data(
365 allocated_capacity_gb
366 )
367 else:
368 allocated_capacity_gb = (
369 self._cached_allocated_capacity_gb.get_data()
370 )
372 data = {
373 'vendor_name': 'Ceph',
374 'driver_version': '1.0',
375 'share_backend_name': self.backend_name,
376 'storage_protocol': self.configuration.safe_get(
377 'cephfs_protocol_helper_type'),
378 'pools': [
379 {
380 'pool_name': 'cephfs',
381 'total_capacity_gb': total_capacity_gb,
382 'free_capacity_gb': free_capacity_gb,
383 'allocated_capacity_gb': allocated_capacity_gb,
384 'qos': 'False',
385 'reserved_percentage': self.configuration.safe_get(
386 'reserved_share_percentage'),
387 'reserved_snapshot_percentage':
388 self.configuration.safe_get(
389 'reserved_share_from_snapshot_percentage') or
390 self.configuration.safe_get(
391 'reserved_share_percentage'),
392 'reserved_share_extend_percentage':
393 self.configuration.safe_get(
394 'reserved_share_extend_percentage') or
395 self.configuration.safe_get(
396 'reserved_share_percentage'),
397 'dedupe': [False],
398 'compression': [False],
399 'thin_provisioning': [True]
400 }
401 ],
402 'total_capacity_gb': total_capacity_gb,
403 'free_capacity_gb': free_capacity_gb,
404 'allocated_capacity_gb': allocated_capacity_gb,
405 'snapshot_support': True,
406 'create_share_from_snapshot_support': True,
407 }
408 super( # pylint: disable=no-member
409 CephFSDriver, self)._update_share_stats(data)
411 def _to_bytes(self, gigs):
412 """Convert a Manila size into bytes.
414 Manila uses gibibytes everywhere.
416 :param gigs: integer number of gibibytes.
417 :return: integer number of bytes.
418 """
419 return gigs * units.Gi
421 def _get_subvolume_name(self, share_id):
422 try:
423 subvolume_name = self.private_storage.get(
424 share_id, "subvolume_name")
425 except Exception:
426 return share_id
427 # Subvolume name could be None, so in case it is, return share_id
428 return subvolume_name or share_id
430 def _get_subvolume_snapshot_name(self, snapshot_id):
431 try:
432 subvolume_snapshot_name = self.private_storage.get(
433 snapshot_id, "subvolume_snapshot_name"
434 )
435 except Exception:
436 return snapshot_id
437 return subvolume_snapshot_name or snapshot_id
439 def _get_export_locations(self, share, subvolume_name=None):
440 """Get the export location for a share.
442 :param share: a manila share.
443 :return: the export location for a share.
444 """
446 subvolume_name = subvolume_name or share["id"]
447 # get path of FS subvolume/share
448 argdict = {
449 "vol_name": self.volname,
450 "sub_name": subvolume_name
451 }
452 if share['share_group_id'] is not None: 452 ↛ 453line 452 didn't jump to line 453 because the condition on line 452 was never true
453 argdict.update({"group_name": share["share_group_id"]})
455 subvolume_path = rados_command(
456 self.rados_client, "fs subvolume getpath", argdict)
458 return self.protocol_helper.get_export_locations(share, subvolume_path)
460 def get_optional_share_creation_data(self, share, share_server=None):
461 """Get the additional properties to be set in a share.
463 :return: the metadata to be set in share.
464 """
466 return self.protocol_helper.get_optional_share_creation_data(share)
468 def setup_default_ceph_cmd_target(self):
469 global ceph_default_target
470 if not ceph_default_target:
471 ceph_default_target = ('mon-mgr', )
473 try:
474 ceph_major_version = self.ceph_mon_version['major']
475 except Exception:
476 msg = _("Error reading ceph version to set the default "
477 "target. Please check your Ceph backend is reachable.")
478 raise exception.ShareBackendException(msg=msg)
480 if ceph_major_version == '14':
481 ceph_default_target = ('mgr', )
482 elif ceph_major_version < '14':
483 msg = _("CephFSDriver does not support Ceph "
484 "cluster version less than 14.x (Nautilus)")
485 raise exception.ShareBackendException(msg=msg)
487 @property
488 def ceph_mon_version(self):
489 if self._ceph_mon_version: 489 ↛ 490line 489 didn't jump to line 490 because the condition on line 489 was never true
490 return self._ceph_mon_version
492 self._ceph_mon_version = {}
494 output = rados_command(self.rados_client, "version", target=('mon', ))
496 version_str = json.loads(output)["version"]
498 p = re.compile(r"ceph version (\d+)\.(\d+)\.(\d+)")
499 major, minor, extra = p.match(version_str).groups()
500 self._ceph_mon_version['major'] = major
501 self._ceph_mon_version['minor'] = minor
502 self._ceph_mon_version['extra'] = extra
504 return self._ceph_mon_version
506 @property
507 def rados_client(self):
508 if self._rados_client:
509 return self._rados_client
511 conf_path = self.configuration.safe_get('cephfs_conf_path')
512 cluster_name = self.configuration.safe_get('cephfs_cluster_name')
513 auth_id = self.configuration.safe_get('cephfs_auth_id')
514 self._rados_client = rados.Rados(
515 name="client.{0}".format(auth_id),
516 clustername=cluster_name,
517 conffile=conf_path,
518 conf={}
519 )
521 LOG.info("[%(be)s] Ceph client found, connecting...",
522 {"be": self.backend_name})
523 try:
524 if self._rados_client.state != "connected": 524 ↛ 532line 524 didn't jump to line 532 because the condition on line 524 was always true
525 self._rados_client.connect()
526 except Exception:
527 self._rados_client = None
528 raise exception.ShareBackendException(
529 "[%(be)s] Ceph client failed to connect.",
530 {"be": self.backend_name})
531 else:
532 LOG.info("[%(be)s] Ceph client connection complete.",
533 {"be": self.backend_name})
535 return self._rados_client
537 @property
538 def volname(self):
539 # Name of the CephFS volume/filesystem where the driver creates
540 # manila entities such as shares, sharegroups, snapshots, etc.
541 if self._volname:
542 return self._volname
544 self._volname = self.configuration.safe_get('cephfs_filesystem_name')
545 if not self._volname:
546 out = rados_command(
547 self.rados_client, "fs volume ls", json_obj=True)
548 if len(out) == 1:
549 self._volname = out[0]['name']
550 else:
551 if len(out) > 1:
552 msg = _("Specify Ceph filesystem name using "
553 "'cephfs_filesystem_name' driver option.")
554 else:
555 msg = _("No Ceph filesystem found.")
556 raise exception.ShareBackendException(msg=msg)
558 return self._volname
560 def create_share(self, context, share, share_server=None):
561 """Create a CephFS volume.
563 :param context: A RequestContext.
564 :param share: A Share.
565 :param share_server: Always None for CephFS native.
566 :return: The export locations dictionary.
567 """
568 requested_proto = share['share_proto'].upper()
569 supported_proto = (
570 self.configuration.cephfs_protocol_helper_type.upper())
571 if (requested_proto != supported_proto):
572 msg = _("Share protocol %s is not supported.") % requested_proto
573 raise exception.ShareBackendException(msg=msg)
574 size = self._to_bytes(share['size'])
576 LOG.debug("[%(be)s]: create_share: id=%(id)s, size=%(sz)s, "
577 "group=%(gr)s.",
578 {"be": self.backend_name, "id": share['id'],
579 "sz": share['size'], "gr": share['share_group_id']})
581 # create FS subvolume/share
582 argdict = {
583 "vol_name": self.volname,
584 "sub_name": share["id"],
585 "size": size,
586 "namespace_isolated": True,
587 "mode": self._cephfs_volume_mode
588 }
590 if share['share_group_id'] is not None: 590 ↛ 591line 590 didn't jump to line 591 because the condition on line 590 was never true
591 argdict.update({"group_name": share["share_group_id"]})
593 rados_command(self.rados_client, "fs subvolume create", argdict)
595 return self._get_export_locations(share)
597 def _get_subvolume_size_in_gb(self, subvolume_size):
598 """Returns the size of the subvolume in GB."""
599 # There is a chance that we would end up with 2.5gb for example, so
600 # we round it up
601 return int(math.ceil(int(subvolume_size) / units.Gi))
603 def manage_existing(self, share, driver_options):
604 # bring FS subvolume/share under manila management
605 LOG.debug("[%(be)s]: manage_existing: id=%(id)s.",
606 {"be": self.backend_name, "id": share['id']})
608 # Subvolume name must be provided.
609 subvolume_name = share['export_locations'][0]['path']
610 if not subvolume_name:
611 raise exception.ShareBackendException(
612 "The subvolume name must be provided as a 'export_path' while "
613 "managing shares.")
615 argdict = {
616 "vol_name": self.volname,
617 "sub_name": subvolume_name,
618 }
620 subvolume_info = {}
621 # Try to get the subvolume info in the ceph backend
622 try:
623 subvolume_info = rados_command(
624 self.rados_client, "fs subvolume info", argdict, json_obj=True)
625 except exception.ShareBackendException as e:
626 # Couldn't find a subvolume with the name provided.
627 if 'does not exist' in str(e).lower(): 627 ↛ 633line 627 didn't jump to line 633 because the condition on line 627 was always true
628 msg = ("Subvolume %(subvol)s cannot be found on the "
629 "backend." % {'subvol': subvolume_name})
630 raise exception.ShareBackendException(msg=msg)
632 # Check if share mode matches
633 if subvolume_info.get('mode') != self._cephfs_volume_mode: 633 ↛ 637line 633 didn't jump to line 637 because the condition on line 633 was always true
634 LOG.info("Subvolume %(subvol)s mode is different from what is "
635 "configured in Manila.")
637 subvolume_size = subvolume_info.get('bytes_quota')
639 # We need to resize infinite subvolumes, as Manila doesn't support it
640 if isinstance(subvolume_size, str) and subvolume_size == "infinite":
641 try:
642 # Default resize gb must be configured
643 new_size = driver_options.get('size')
644 if not new_size or new_size <= 0:
645 msg = ("subvolume %s has infinite size and a valid "
646 "integer value was not added to the driver_options "
647 "arg. Please provide a 'size' in the driver "
648 "options and try again." % subvolume_name)
649 raise exception.ShareBackendException(msg=msg)
651 # Attempt resizing the subvolume
652 self._resize_share(share, new_size, no_shrink=True)
653 subvolume_size = new_size
654 except exception.ShareShrinkingPossibleDataLoss:
655 msg = ("Could not resize the subvolume using the provided "
656 "size, as data could be lost. Please update it and "
657 "try again.")
658 LOG.exception(msg)
659 raise
660 except exception.ShareBackendException:
661 raise
662 else:
663 if int(subvolume_size) % units.Gi == 0:
664 # subvolume_size is an integer GB, no need to resize subvolume
665 subvolume_size = self._get_subvolume_size_in_gb(subvolume_size)
666 else:
667 # subvolume size is not an integer GB. need to resize subvolume
668 new_size_gb = self._get_subvolume_size_in_gb(subvolume_size)
669 LOG.info(
670 "Subvolume %(subvol)s is being resized to %(new_size)s "
671 "GB.", {
672 'subvol': subvolume_name,
673 'new_size': new_size_gb
674 }
675 )
676 self._resize_share(share, new_size_gb, no_shrink=True)
677 subvolume_size = new_size_gb
679 share_metadata = {"subvolume_name": subvolume_name}
680 self.private_storage.update(share['id'], share_metadata)
682 export_locations = self._get_export_locations(
683 share, subvolume_name=subvolume_name
684 )
686 managed_share = {
687 "size": subvolume_size,
688 "export_locations": export_locations
689 }
690 return managed_share
692 def manage_existing_snapshot(self, snapshot, driver_options):
693 # bring FS subvolume/share under manila management
694 LOG.debug("[%(be)s]: manage_existing_snapshot: id=%(id)s.",
695 {"be": self.backend_name, "id": snapshot['id']})
697 # Subvolume name must be provided.
698 sub_snapshot_name = snapshot.get('provider_location', None)
699 if not sub_snapshot_name:
700 raise exception.ShareBackendException(
701 "The subvolume snapshot name must be provided as the "
702 "'provider_location' while managing snapshots.")
704 sub_name = self._get_subvolume_name(snapshot['share_instance_id'])
706 argdict = {
707 "vol_name": self.volname,
708 "sub_name": sub_name,
709 }
711 # Try to get the subvolume info in the ceph backend, this is useful for
712 # us to get the size for the snapshot.
713 try:
714 rados_command(
715 self.rados_client, "fs subvolume info", argdict, json_obj=True)
716 except exception.ShareBackendException as e:
717 # Couldn't find a subvolume with the name provided.
718 if 'does not exist' in str(e).lower(): 718 ↛ 723line 718 didn't jump to line 723 because the condition on line 718 was always true
719 msg = ("Subvolume %(subvol)s cannot be found on the "
720 "backend." % {'subvol': sub_name})
721 raise exception.ShareBackendException(msg=msg)
723 sub_snap_info_argdict = {
724 "vol_name": self.volname,
725 "sub_name": sub_name,
726 "snap_name": sub_snapshot_name
727 }
728 # Shares/subvolumes already managed by manila will never have
729 # infinite as their bytes_quota, so no need for extra precaution.
730 try:
731 managed_subvolume_snapshot = rados_command(
732 self.rados_client, "fs subvolume snapshot info",
733 sub_snap_info_argdict, json_obj=True
734 )
735 except exception.ShareBackendException as e:
736 # Couldn't find a subvolume snapshot with the name provided.
737 if 'does not exist' in str(e).lower(): 737 ↛ 742line 737 didn't jump to line 742 because the condition on line 737 was always true
738 msg = ("Subvolume snapshot %(snap)s cannot be found on the "
739 "backend." % {'snap': sub_snapshot_name})
740 raise exception.ShareBackendException(msg=msg)
742 snapshot_metadata = {"subvolume_snapshot_name": sub_snapshot_name}
743 self.private_storage.update(
744 snapshot['snapshot_id'], snapshot_metadata
745 )
747 # NOTE(carloss): fs subvolume snapshot info command does not return
748 # the snapshot size, so we reuse the share size until this is not
749 # available for us.
750 managed_snapshot = {'provider_location': sub_snapshot_name}
751 if managed_subvolume_snapshot.get('bytes_quota') is not None: 751 ↛ 752line 751 didn't jump to line 752 because the condition on line 751 was never true
752 managed_snapshot['size'] = self._get_subvolume_size_in_gb(
753 managed_subvolume_snapshot['bytes_quota'])
755 return managed_snapshot
757 def _need_to_cancel_clone(self, share, clone_name):
758 # Is there an ongoing clone operation that needs to be canceled
759 # so we can delete the share?
760 need_to_cancel_clone = False
762 argdict = {
763 "vol_name": self.volname,
764 "clone_name": clone_name,
765 }
766 if share['share_group_id'] is not None: 766 ↛ 767line 766 didn't jump to line 767 because the condition on line 766 was never true
767 argdict.update({"group_name": share["share_group_id"]})
769 try:
770 status = rados_command(
771 self.rados_client, "fs clone status", argdict)
772 if status in (CLONE_PENDING, CLONE_INPROGRESS):
773 need_to_cancel_clone = True
774 except exception.ShareBackendException as e:
775 # Trying to get clone status on a regular subvolume is expected
776 # to fail.
777 if 'not allowed on subvolume' not in str(e).lower(): 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true
778 raise exception.ShareBackendException(
779 "Failed to remove share.")
781 return need_to_cancel_clone
783 def delete_share(self, context, share, share_server=None):
784 # remove FS subvolume/share
785 LOG.debug("[%(be)s]: delete_share: id=%(id)s, group=%(gr)s.",
786 {"be": self.backend_name, "id": share['id'],
787 "gr": share['share_group_id']})
789 clone_name = self._get_subvolume_name(share['id'])
790 if self._need_to_cancel_clone(share, clone_name):
791 try:
792 argdict = {
793 "vol_name": self.volname,
794 "clone_name": clone_name,
795 "force": True,
796 }
797 if share['share_group_id'] is not None: 797 ↛ 798line 797 didn't jump to line 798 because the condition on line 797 was never true
798 argdict.update({"group_name": share["share_group_id"]})
800 rados_command(self.rados_client, "fs clone cancel", argdict)
801 except rados.Error:
802 raise exception.ShareBackendException(
803 "Failed to cancel clone operation.")
805 argdict = {
806 "vol_name": self.volname,
807 "sub_name": self._get_subvolume_name(share["id"]),
808 # We want to clean up the share even if the subvolume is
809 # not in a good state.
810 "force": True,
811 }
812 if share['share_group_id'] is not None: 812 ↛ 813line 812 didn't jump to line 813 because the condition on line 812 was never true
813 argdict.update({"group_name": share["share_group_id"]})
815 rados_command(self.rados_client, "fs subvolume rm", argdict)
817 def update_access(self, context, share, access_rules, add_rules,
818 delete_rules, update_rules, share_server=None):
819 sub_name = self._get_subvolume_name(share['id'])
820 return self.protocol_helper.update_access(
821 context, share, access_rules, add_rules, delete_rules,
822 update_rules, share_server=share_server, sub_name=sub_name)
824 def get_backend_info(self, context):
825 return self.protocol_helper.get_backend_info(context)
827 def ensure_shares(self, context, shares):
828 share_updates = {}
829 for share in shares:
830 share_updates[share['id']] = {
831 'reapply_access_rules':
832 self.protocol_helper.reapply_rules_while_ensuring_shares,
833 }
834 try:
835 share_metadata = (
836 self.get_optional_share_creation_data(share).get(
837 "metadata", {})
838 )
839 share_updates[share['id']].update({
840 'export_locations': self._get_export_locations(share),
841 "metadata": share_metadata
842 })
843 except exception.ShareBackendException as e:
844 if 'does not exist' in str(e).lower(): 844 ↛ 829line 844 didn't jump to line 829 because the condition on line 844 was always true
845 msg = ("Share instance %(si)s belonging to share "
846 "%(share)s cannot be found on the backend.")
847 msg_payload = {'si': share['id'],
848 'share': share['share_id']}
849 LOG.exception(msg, msg_payload)
850 share_updates[share['id']] = {
851 'status': constants.STATUS_ERROR,
852 }
853 return share_updates
855 def _resize_share(self, share, new_size, no_shrink=False):
856 argdict = {
857 "vol_name": self.volname,
858 "sub_name": self._get_subvolume_name(share["id"]),
859 "new_size": self._to_bytes(new_size),
860 }
861 if share["share_group_id"] is not None: 861 ↛ 862line 861 didn't jump to line 862 because the condition on line 861 was never true
862 argdict.update({"group_name": share["share_group_id"]})
864 if no_shrink:
865 argdict.update({"no_shrink": True})
867 try:
868 rados_command(self.rados_client, "fs subvolume resize", argdict)
869 except exception.ShareBackendException as e:
870 if 'would be lesser than' in str(e).lower(): 870 ↛ 873line 870 didn't jump to line 873 because the condition on line 870 was always true
871 raise exception.ShareShrinkingPossibleDataLoss(
872 share_id=share['id'])
873 raise
875 def extend_share(self, share, new_size, share_server=None):
876 # resize FS subvolume/share
877 LOG.debug("[%(be)s]: extend_share: share=%(id)s, size=%(sz)s.",
878 {"be": self.backend_name, "id": share['id'],
879 "sz": new_size})
881 self._resize_share(share, new_size)
883 def shrink_share(self, share, new_size, share_server=None):
884 # resize FS subvolume/share
885 LOG.debug("[%(be)s]: shrink_share: share=%(id)s, size=%(sz)s.",
886 {"be": self.backend_name, "id": share['id'],
887 "sz": new_size})
889 self._resize_share(share, new_size, no_shrink=True)
891 def create_snapshot(self, context, snapshot, share_server=None):
892 # create a FS snapshot
893 LOG.debug("[%(be)s]: create_snapshot: original share=%(id)s, "
894 "snapshot=%(sn)s.",
895 {"be": self.backend_name, "id": snapshot['share_id'],
896 "sn": snapshot['id']})
898 argdict = {
899 "vol_name": self.volname,
900 "sub_name": self._get_subvolume_name(snapshot["share_id"]),
901 "snap_name": snapshot["snapshot_id"],
902 }
904 rados_command(
905 self.rados_client, "fs subvolume snapshot create", argdict)
907 return {"provider_location": snapshot["snapshot_id"]}
909 def delete_snapshot(self, context, snapshot, share_server=None):
910 # delete a FS snapshot
911 LOG.debug("[%(be)s]: delete_snapshot: snapshot=%(id)s.",
912 {"be": self.backend_name, "id": snapshot['id']})
914 snapshot_name = self._get_subvolume_snapshot_name(
915 snapshot['snapshot_id']
916 )
917 # FIXME(vkmc) remove this in CC (next tick) release.
918 legacy_snap_name = "_".join([snapshot["snapshot_id"], snapshot["id"]])
920 argdict_legacy = {
921 "vol_name": self.volname,
922 "sub_name": self._get_subvolume_name(snapshot["share_id"]),
923 "snap_name": legacy_snap_name,
924 "force": True,
925 }
927 # try removing snapshot using legacy naming
928 rados_command(
929 self.rados_client, "fs subvolume snapshot rm", argdict_legacy)
931 # in case it's a snapshot with new naming, retry remove with new name
932 argdict = argdict_legacy.copy()
933 argdict.update({"snap_name": snapshot_name})
934 rados_command(self.rados_client, "fs subvolume snapshot rm", argdict)
936 def create_share_group(self, context, sg_dict, share_server=None):
937 # delete a FS group
938 LOG.debug("[%(be)s]: create_share_group: share_group=%(id)s.",
939 {"be": self.backend_name, "id": sg_dict['id']})
941 argdict = {
942 "vol_name": self.volname,
943 "group_name": sg_dict['id'],
944 "mode": self._cephfs_volume_mode,
945 }
947 rados_command(self.rados_client, "fs subvolumegroup create", argdict)
949 def delete_share_group(self, context, sg_dict, share_server=None):
950 # delete a FS group
951 LOG.debug("[%(be)s]: delete_share_group: share_group=%(id)s.",
952 {"be": self.backend_name, "id": sg_dict['id']})
954 argdict = {
955 "vol_name": self.volname,
956 "group_name": sg_dict['id'],
957 "force": True,
958 }
960 rados_command(self.rados_client, "fs subvolumegroup rm", argdict)
962 def delete_share_group_snapshot(self, context, snap_dict,
963 share_server=None):
964 # delete a FS group snapshot
965 LOG.debug("[%(be)s]: delete_share_group_snapshot: "
966 "share_group=%(sg_id)s, snapshot=%(sn)s.",
967 {"be": self.backend_name, "sg_id": snap_dict['id'],
968 "sn": snap_dict["share_group_id"]})
970 argdict = {
971 "vol_name": self.volname,
972 "group_name": snap_dict["share_group_id"],
973 "snap_name": snap_dict["id"],
974 "force": True,
975 }
977 rados_command(
978 self.rados_client, "fs subvolumegroup snapshot rm", argdict)
980 return None, []
982 def create_share_group_snapshot(self, context, snap_dict,
983 share_server=None):
984 # create a FS group snapshot
985 LOG.debug("[%(be)s]: create_share_group_snapshot: share_group=%(id)s, "
986 "snapshot=%(sn)s.",
987 {"be": self.backend_name, "id": snap_dict['share_group_id'],
988 "sn": snap_dict["id"]})
990 msg = _("Share group snapshot feature is no longer supported in "
991 "mainline CephFS (existing group snapshots can still be "
992 "listed and deleted).")
993 raise exception.ShareBackendException(msg=msg)
995 def _get_clone_status(self, share):
996 """Check the status of a newly cloned share."""
997 clone_name = self._get_subvolume_name(share["id"])
998 argdict = {
999 "vol_name": self.volname,
1000 "clone_name": clone_name
1001 }
1002 if share['share_group_id'] is not None: 1002 ↛ 1003line 1002 didn't jump to line 1003 because the condition on line 1002 was never true
1003 argdict.update({"group_name": share["share_group_id"]})
1005 out = rados_command(self.rados_client,
1006 "fs clone status", argdict, True)
1007 return out['status']['state']
1009 def _update_create_from_snapshot_status(self, share):
1010 updates = {
1011 'status': constants.STATUS_ERROR,
1012 'progress': None,
1013 'export_locations': []
1014 }
1015 status = self._get_clone_status(share)
1016 if status == CLONE_COMPLETE: 1016 ↛ 1017line 1016 didn't jump to line 1017 because the condition on line 1016 was never true
1017 updates['status'] = constants.STATUS_AVAILABLE
1018 updates['progress'] = '100%'
1019 updates['export_locations'] = self._get_export_locations(share)
1020 elif status in (CLONE_PENDING, CLONE_INPROGRESS): 1020 ↛ 1024line 1020 didn't jump to line 1024 because the condition on line 1020 was always true
1021 updates['status'] = constants.STATUS_CREATING_FROM_SNAPSHOT
1022 else:
1023 # error if clone operation is not progressing or completed
1024 raise exception.ShareBackendException(
1025 "rados client clone of snapshot [%(sn)s}] to new "
1026 "share [%(shr)s}] did not complete successfully." %
1027 {"sn": share["snapshot_id"], "shr": share["id"]})
1028 return updates
1030 def get_share_status(self, share, share_server=None):
1031 """Returns the current status for a share.
1033 :param share: a manila share.
1034 :param share_server: a manila share server (not currently supported).
1035 :returns: manila share status.
1036 """
1038 if share['status'] != constants.STATUS_CREATING_FROM_SNAPSHOT: 1038 ↛ 1042line 1038 didn't jump to line 1042 because the condition on line 1038 was always true
1039 LOG.warning("Caught an unexpected share status '%s' during share "
1040 "status update routine. Skipping.", share['status'])
1041 return
1042 return self._update_create_from_snapshot_status(share)
1044 def create_share_from_snapshot(self, context, share, snapshot,
1045 share_server=None, parent_share=None):
1046 """Create a CephFS subvolume from a snapshot"""
1048 LOG.debug("[%(be)s]: create_share_from_snapshot: id=%(id)s, "
1049 "snapshot=%(sn)s, size=%(sz)s, group=%(gr)s.",
1050 {"be": self.backend_name, "id": share['id'],
1051 "sn": snapshot['id'], "sz": share['size'],
1052 "gr": share['share_group_id']})
1054 argdict = {
1055 "vol_name": self.volname,
1056 "sub_name": self._get_subvolume_name(parent_share["id"]),
1057 "snap_name": self._get_subvolume_snapshot_name(
1058 snapshot["snapshot_id"]),
1059 "target_sub_name": self._get_subvolume_name(share["id"])
1060 }
1061 if share['share_group_id'] is not None: 1061 ↛ 1062line 1061 didn't jump to line 1062 because the condition on line 1061 was never true
1062 argdict.update({"group_name": share["share_group_id"]})
1064 rados_command(
1065 self.rados_client, "fs subvolume snapshot clone", argdict)
1067 return self._update_create_from_snapshot_status(share)
1069 def __del__(self):
1070 if self._rados_client:
1071 LOG.info("[%(be)s] Ceph client disconnecting...",
1072 {"be": self.backend_name})
1073 self._rados_client.shutdown()
1074 self._rados_client = None
1075 LOG.info("[%(be)s] Ceph client disconnected",
1076 {"be": self.backend_name})
1078 def get_configured_ip_versions(self):
1079 return self.protocol_helper.get_configured_ip_versions()
1081 def transfer_accept(self, context, share, new_user, new_project,
1082 access_rules=None, share_server=None):
1083 # CephFS driver cannot transfer shares by preserving access rules
1084 same_project = share["project_id"] == new_project
1085 if access_rules and not same_project: 1085 ↛ exitline 1085 didn't return from function 'transfer_accept' because the condition on line 1085 was always true
1086 raise exception.DriverCannotTransferShareWithRules()
1089class NativeProtocolHelper(ganesha.NASHelperBase):
1090 """Helper class for native CephFS protocol"""
1092 supported_access_types = (CEPHX_ACCESS_TYPE, )
1093 supported_access_levels = (constants.ACCESS_LEVEL_RW,
1094 constants.ACCESS_LEVEL_RO)
1095 reapply_rules_while_ensuring_shares = False
1097 def __init__(self, execute, config, **kwargs):
1098 self.rados_client = kwargs.pop('rados_client')
1099 self.volname = kwargs.pop('volname')
1100 self.message_api = message_api.API()
1101 super(NativeProtocolHelper, self).__init__(execute, config,
1102 **kwargs)
1104 def _init_helper(self):
1105 pass
1107 def check_for_setup_error(self):
1108 """Returns an error if prerequisites aren't met."""
1109 return
1111 def get_mon_addrs(self):
1112 result = []
1113 mon_map = rados_command(self.rados_client, "mon dump", json_obj=True,
1114 target=('mon', ))
1115 for mon in mon_map['mons']:
1116 ip_port = mon['addr'].split("/")[0]
1117 result.append(ip_port)
1119 return result
1121 def get_backend_info(self, context):
1122 return {
1123 "cephfs_ensure_all_shares_salt":
1124 self.configuration.cephfs_ensure_all_shares_salt,
1125 "cephfs_filesystem_name": self.volname,
1126 }
1128 def get_export_locations(self, share, subvolume_path):
1129 # To mount this you need to know the mon IPs and the path to the volume
1130 mon_addrs = self.get_mon_addrs()
1132 export_location = "{addrs}:{path}".format(
1133 addrs=",".join(mon_addrs),
1134 path=subvolume_path)
1136 LOG.info("Calculated export location for share %(id)s: %(loc)s",
1137 {"id": share['id'], "loc": export_location})
1139 return {
1140 'path': export_location,
1141 'is_admin_only': False,
1142 'metadata': {},
1143 }
1145 def get_optional_share_creation_data(self, share, share_server=None):
1146 return {"metadata": {"__mount_options": f"fs={self.volname}"}}
1148 def _allow_access(self, context, share, access, share_server=None,
1149 sub_name=None):
1150 if access['access_type'] != CEPHX_ACCESS_TYPE:
1151 raise exception.InvalidShareAccessType(type=access['access_type'])
1153 ceph_auth_id = access['access_to']
1155 # We need to check here rather than the API or Manila Client to see
1156 # if the ceph_auth_id is the same as the one specified for Manila's
1157 # usage. This is due to the fact that the API and the Manila client
1158 # cannot read the contents of the Manila configuration file. If it
1159 # is the same, we need to error out.
1160 if ceph_auth_id == CONF.cephfs_auth_id:
1161 error_message = (_('Ceph authentication ID %s must be different '
1162 'than the one the Manila service uses.') %
1163 ceph_auth_id)
1164 raise exception.InvalidShareAccess(reason=error_message)
1166 argdict = {
1167 "vol_name": self.volname,
1168 "sub_name": sub_name,
1169 "auth_id": ceph_auth_id,
1170 "tenant_id": share["project_id"],
1171 }
1172 if share["share_group_id"] is not None: 1172 ↛ 1173line 1172 didn't jump to line 1173 because the condition on line 1172 was never true
1173 argdict.update({"group_name": share["share_group_id"]})
1175 readonly = access['access_level'] == constants.ACCESS_LEVEL_RO
1177 if readonly:
1178 argdict.update({"access_level": "r"})
1179 else:
1180 argdict.update({"access_level": "rw"})
1182 try:
1183 auth_result = rados_command(
1184 self.rados_client, "fs subvolume authorize", argdict)
1185 except exception.ShareBackendException as e:
1186 if 'not allowed' in str(e).lower(): 1186 ↛ 1192line 1186 didn't jump to line 1192 because the condition on line 1186 was always true
1187 msg = ("Access to client %(client)s is not allowed. "
1188 "Reason: %(reason)s")
1189 msg_payload = {'client': ceph_auth_id, 'reason': e}
1190 raise exception.InvalidShareAccess(
1191 reason=msg % msg_payload)
1192 raise
1194 return auth_result
1196 def _deny_access(self, context, share, access, share_server=None,
1197 sub_name=None):
1198 if access['access_type'] != CEPHX_ACCESS_TYPE: 1198 ↛ 1199line 1198 didn't jump to line 1199 because the condition on line 1198 was never true
1199 LOG.warning("Invalid access type '%(type)s', "
1200 "ignoring in deny.",
1201 {"type": access['access_type']})
1202 return
1204 argdict = {
1205 "vol_name": self.volname,
1206 "sub_name": sub_name,
1207 "auth_id": access['access_to']
1208 }
1209 if share["share_group_id"] is not None: 1209 ↛ 1210line 1209 didn't jump to line 1210 because the condition on line 1209 was never true
1210 argdict.update({"group_name": share["share_group_id"]})
1212 try:
1213 rados_command(self.rados_client,
1214 "fs subvolume deauthorize",
1215 argdict)
1216 except exception.ShareBackendException as e:
1217 if "doesn't exist" in e.msg.lower(): 1217 ↛ 1221line 1217 didn't jump to line 1221 because the condition on line 1217 was always true
1218 LOG.warning(f"%{access['access_to']} did not have access to "
1219 f"share {share['id']}.")
1220 return
1221 raise e
1222 rados_command(self.rados_client, "fs subvolume evict", argdict)
1224 def update_access(self, context, share, access_rules, add_rules,
1225 delete_rules, update_rules, share_server=None,
1226 sub_name=None):
1227 access_updates = {}
1229 argdict = {
1230 "vol_name": self.volname,
1231 "sub_name": sub_name,
1232 }
1233 if share["share_group_id"] is not None: 1233 ↛ 1234line 1233 didn't jump to line 1234 because the condition on line 1233 was never true
1234 argdict.update({"group_name": share["share_group_id"]})
1236 if not (add_rules or delete_rules): # recovery/maintenance mode
1237 add_rules = access_rules
1239 existing_auths = None
1241 existing_auths = rados_command(
1242 self.rados_client, "fs subvolume authorized_list",
1243 argdict, json_obj=True)
1245 if existing_auths: 1245 ↛ 1266line 1245 didn't jump to line 1266 because the condition on line 1245 was always true
1246 existing_auth_ids = set()
1247 for rule in range(len(existing_auths)):
1248 for cephx_id in existing_auths[rule]:
1249 existing_auth_ids.add(cephx_id)
1250 want_auth_ids = set(
1251 [rule['access_to'] for rule in add_rules])
1252 delete_auth_ids = existing_auth_ids.difference(
1253 want_auth_ids)
1254 delete_auth_ids_list = delete_auth_ids
1255 for delete_auth_id in delete_auth_ids_list:
1256 delete_rules.append(
1257 {
1258 'access_to': delete_auth_id,
1259 'access_type': CEPHX_ACCESS_TYPE,
1260 })
1262 # During recovery mode, re-authorize share access for auth IDs that
1263 # were already granted access by the backend. Do this to fetch their
1264 # access keys and ensure that after recovery, manila and the Ceph
1265 # backend are in sync.
1266 for rule in add_rules:
1267 try:
1268 access_key = self._allow_access(
1269 context, share, rule, sub_name=sub_name
1270 )
1271 except (exception.InvalidShareAccessLevel,
1272 exception.InvalidShareAccessType):
1273 self.message_api.create(
1274 context,
1275 message_field.Action.UPDATE_ACCESS_RULES,
1276 share['project_id'],
1277 resource_type=message_field.Resource.SHARE,
1278 resource_id=share['share_id'],
1279 detail=message_field.Detail.UNSUPPORTED_CLIENT_ACCESS)
1280 log_args = {'id': rule['access_id'],
1281 'access_level': rule['access_level'],
1282 'access_to': rule['access_to']}
1283 LOG.exception("Failed to provide %(access_level)s access to "
1284 "%(access_to)s (Rule ID: %(id)s). Setting rule "
1285 "to 'error' state.", log_args)
1286 access_updates.update({rule['access_id']: {'state': 'error'}})
1287 except exception.InvalidShareAccess:
1288 self.message_api.create(
1289 context,
1290 message_field.Action.UPDATE_ACCESS_RULES,
1291 share['project_id'],
1292 resource_type=message_field.Resource.SHARE,
1293 resource_id=share['share_id'],
1294 detail=message_field.Detail.FORBIDDEN_CLIENT_ACCESS)
1295 log_args = {'id': rule['access_id'],
1296 'access_level': rule['access_level'],
1297 'access_to': rule['access_to']}
1298 LOG.exception("Failed to provide %(access_level)s access to "
1299 "%(access_to)s (Rule ID: %(id)s). Setting rule "
1300 "to 'error' state.", log_args)
1301 access_updates.update({rule['access_id']: {'state': 'error'}})
1302 else:
1303 access_updates.update({
1304 rule['access_id']: {'access_key': access_key},
1305 })
1307 for rule in delete_rules:
1308 self._deny_access(context, share, rule, sub_name=sub_name)
1310 return access_updates
1312 def get_configured_ip_versions(self):
1313 return [4]
1316class NFSProtocolHelperMixin():
1318 def get_export_locations(self, share, subvolume_path):
1319 export_locations = []
1321 if not self.export_ips: 1321 ↛ 1324line 1321 didn't jump to line 1324 because the condition on line 1321 was always true
1322 self.export_ips = self._get_export_ips()
1324 for export_ip in self.export_ips:
1325 # Try to escape the export ip. If it fails, means that the
1326 # `cephfs_ganesha_server_ip` wasn't possibly set and the used
1327 # address is the hostname
1328 try:
1329 server_address = driver_helpers.escaped_address(
1330 export_ip['ip'])
1331 except ValueError:
1332 server_address = export_ip['ip']
1334 export_path = "{server_address}:{mount_path}".format(
1335 server_address=server_address, mount_path=subvolume_path)
1337 LOG.info("Calculated export path for share %(id)s: %(epath)s",
1338 {"id": share['id'], "epath": export_path})
1340 export_location = {
1341 'path': export_path,
1342 'is_admin_only': False,
1343 'metadata': {
1344 'preferred': export_ip['preferred'],
1345 },
1346 }
1347 export_locations.append(export_location)
1348 return export_locations
1350 def get_optional_share_creation_data(self, share, share_server=None):
1351 return {}
1353 def _get_export_path(self, share, sub_name=None):
1354 """Callback to provide export path."""
1355 argdict = {
1356 "vol_name": self.volname,
1357 "sub_name": sub_name or share["id"]
1358 }
1359 if share["share_group_id"] is not None: 1359 ↛ 1360line 1359 didn't jump to line 1360 because the condition on line 1359 was never true
1360 argdict.update({"group_name": share["share_group_id"]})
1362 path = rados_command(
1363 self.rados_client, "fs subvolume getpath", argdict)
1365 return path
1367 def _get_export_pseudo_path(self, share, sub_name=None):
1368 """Callback to provide pseudo path."""
1369 return self._get_export_path(share, sub_name=sub_name)
1371 def get_configured_ip_versions(self):
1372 if not self.configured_ip_versions:
1373 try:
1374 if not self.export_ips: 1374 ↛ 1377line 1374 didn't jump to line 1377 because the condition on line 1374 was always true
1375 self.export_ips = self._get_export_ips()
1377 for export_ip in self.export_ips:
1378 self.configured_ip_versions.add(
1379 ipaddress.ip_address(str(export_ip['ip'])).version)
1380 except Exception:
1381 # export_ips contained a hostname, safest thing is to
1382 # claim support for IPv4 and IPv6 address families
1383 LOG.warning("Setting configured IP versions to [4, 6] since "
1384 "a hostname (rather than IP address) was supplied "
1385 "in 'cephfs_ganesha_server_ip' or "
1386 "in 'cephfs_ganesha_export_ips'.")
1387 self.configured_ip_versions = {4, 6}
1388 return list(self.configured_ip_versions)
1391class NFSProtocolHelper(NFSProtocolHelperMixin, ganesha.GaneshaNASHelper2):
1393 shared_data = {}
1394 supported_protocols = ('NFS',)
1395 reapply_rules_while_ensuring_shares = True
1397 def __init__(self, execute, config_object, **kwargs):
1398 if config_object.cephfs_ganesha_server_is_remote:
1399 execute = ganesha_utils.SSHExecutor(
1400 config_object.cephfs_ganesha_server_ip, 22, None,
1401 config_object.cephfs_ganesha_server_username,
1402 password=config_object.cephfs_ganesha_server_password,
1403 privatekey=config_object.cephfs_ganesha_path_to_private_key)
1404 else:
1405 execute = ganesha_utils.RootExecutor(execute)
1407 self.ganesha_host = config_object.cephfs_ganesha_server_ip
1408 if not self.ganesha_host:
1409 self.ganesha_host = socket.gethostname()
1410 LOG.info("NFS-Ganesha server's location defaulted to driver's "
1411 "hostname: %s", self.ganesha_host)
1413 super(NFSProtocolHelper, self).__init__(execute, config_object,
1414 **kwargs)
1416 LOG.warning('The NFSProtocolHelper has been deprecated. Starting '
1417 'from the 2025.1 release, we will no longer support '
1418 'exporting NFS shares through a NFS Ganesha instance '
1419 'that not managed by the Ceph orchestrator.')
1420 if not hasattr(self, 'rados_client'): 1420 ↛ 1422line 1420 didn't jump to line 1422 because the condition on line 1420 was always true
1421 self.rados_client = kwargs.pop('rados_client')
1422 if not hasattr(self, 'volname'): 1422 ↛ 1424line 1422 didn't jump to line 1424 because the condition on line 1422 was always true
1423 self.volname = kwargs.pop('volname')
1424 self.export_ips = None
1425 self.configured_ip_versions = set()
1426 self.config = config_object
1428 def check_for_setup_error(self):
1429 """Returns an error if prerequisites aren't met."""
1430 host_address_obj = types.HostAddress()
1431 for export_ip in self.config.cephfs_ganesha_export_ips:
1432 try:
1433 host_address_obj(export_ip)
1434 except ValueError:
1435 msg = (_("Invalid list member of 'cephfs_ganesha_export_ips' "
1436 "option supplied %s -- not a valid IP address or "
1437 "hostname.") % export_ip)
1438 raise exception.InvalidParameterValue(err=msg)
1440 def _default_config_hook(self):
1441 """Callback to provide default export block."""
1442 dconf = super(NFSProtocolHelper, self)._default_config_hook()
1443 conf_dir = ganesha_utils.path_from(__file__, "conf")
1444 ganesha_utils.patch(dconf, self._load_conf_dir(conf_dir))
1445 return dconf
1447 def _fsal_hook(self, base, share, access, sub_name=None):
1448 """Callback to create FSAL subblock."""
1449 ceph_auth_id = ''.join(['ganesha-', share['id']])
1451 argdict = {
1452 "vol_name": self.volname,
1453 "sub_name": sub_name,
1454 "auth_id": ceph_auth_id,
1455 "access_level": "rw",
1456 "tenant_id": share["project_id"],
1457 }
1458 if share["share_group_id"] is not None: 1458 ↛ 1459line 1458 didn't jump to line 1459 because the condition on line 1458 was never true
1459 argdict.update({"group_name": share["share_group_id"]})
1461 auth_result = rados_command(
1462 self.rados_client, "fs subvolume authorize", argdict)
1464 # Restrict Ganesha server's access to only the CephFS subtree or path,
1465 # corresponding to the manila share, that is to be exported by making
1466 # Ganesha use Ceph auth IDs with path restricted capabilities to
1467 # communicate with CephFS.
1468 return {
1469 'Name': 'Ceph',
1470 'User_Id': ceph_auth_id,
1471 'Secret_Access_Key': auth_result,
1472 'Filesystem': self.volname
1473 }
1475 def _cleanup_fsal_hook(self, base, share, access, sub_name=None):
1476 """Callback for FSAL specific cleanup after removing an export."""
1477 ceph_auth_id = ''.join(['ganesha-', share['id']])
1479 argdict = {
1480 "vol_name": self.volname,
1481 "sub_name": sub_name,
1482 "auth_id": ceph_auth_id,
1483 }
1484 if share["share_group_id"] is not None: 1484 ↛ 1485line 1484 didn't jump to line 1485 because the condition on line 1484 was never true
1485 argdict.update({"group_name": share["share_group_id"]})
1487 rados_command(self.rados_client, "fs subvolume deauthorize", argdict)
1489 def _get_export_ips(self):
1490 ganesha_export_ips = self.config.cephfs_ganesha_export_ips
1491 if not ganesha_export_ips:
1492 ganesha_export_ips = [self.ganesha_host]
1494 export_ips = []
1495 for ip in set(ganesha_export_ips):
1496 export_ips.append({'ip': ip, 'preferred': False})
1498 return export_ips
1500 def get_backend_info(self, context):
1501 backend_info = {
1502 "cephfs_ganesha_export_ips": self.config.cephfs_ganesha_export_ips,
1503 "cephfs_ganesha_server_ip": self.config.cephfs_ganesha_server_ip,
1504 "cephfs_ensure_all_shares_salt":
1505 self.configuration.cephfs_ensure_all_shares_salt,
1506 }
1507 return backend_info
1510class NFSClusterProtocolHelper(NFSProtocolHelperMixin, ganesha.NASHelperBase):
1512 supported_access_types = ('ip', )
1513 supported_access_levels = (constants.ACCESS_LEVEL_RW,
1514 constants.ACCESS_LEVEL_RO)
1515 reapply_rules_while_ensuring_shares = True
1517 def __init__(self, execute, config_object, **kwargs):
1518 self.rados_client = kwargs.pop('rados_client')
1519 self.volname = kwargs.pop('volname')
1520 self.configured_ip_versions = set()
1521 self.configuration = config_object
1522 self._nfs_clusterid = None
1523 self.export_ips = None
1524 super(NFSClusterProtocolHelper, self).__init__(execute,
1525 config_object,
1526 **kwargs)
1528 @property
1529 def nfs_clusterid(self):
1530 # ID of the NFS cluster where the driver exports shares
1531 if self._nfs_clusterid:
1532 return self._nfs_clusterid
1534 self._nfs_clusterid = (
1535 self.configuration.safe_get('cephfs_nfs_cluster_id'))
1537 if not self._nfs_clusterid:
1538 msg = _("The NFS Cluster ID has not been configured"
1539 "Please check cephfs_nfs_cluster_id option "
1540 "has been correctly set in the backend configuration.")
1541 raise exception.ShareBackendException(msg=msg)
1543 return self._nfs_clusterid
1545 def _get_configured_export_ips(self):
1546 ganesha_server_ips = (
1547 self.configuration.safe_get('cephfs_ganesha_export_ips') or [])
1548 if not ganesha_server_ips:
1549 ganesha_server_ips = (
1550 self.configuration.safe_get('cephfs_ganesha_server_ip'))
1551 ganesha_server_ips = (
1552 [ganesha_server_ips] if ganesha_server_ips else [])
1554 return set(ganesha_server_ips)
1556 def _get_export_ips(self):
1557 """Get NFS cluster export ips."""
1558 nfs_clusterid = self.nfs_clusterid
1559 ceph_nfs_export_ips = []
1560 ganesha_export_ips = self._get_configured_export_ips()
1561 argdict = {
1562 "cluster_id": nfs_clusterid,
1563 }
1565 output = rados_command(self.rados_client, "nfs cluster info", argdict)
1567 nfs_cluster_info = json.loads(output)
1569 # NFS has been deployed with an ingress
1570 # we use the VIP for the export ips
1571 vip = nfs_cluster_info[nfs_clusterid]["virtual_ip"]
1573 # there is no VIP, we fallback to NFS cluster ips
1574 if not vip: 1574 ↛ 1579line 1574 didn't jump to line 1579 because the condition on line 1574 was always true
1575 hosts = nfs_cluster_info[nfs_clusterid]["backend"]
1576 for host in hosts:
1577 ceph_nfs_export_ips.append(host["ip"])
1578 else:
1579 ceph_nfs_export_ips.append(vip)
1581 # there are no export IPs, there are no NFS servers we can use
1582 if not ceph_nfs_export_ips:
1583 msg = _("There are no NFS servers available to use. "
1584 "Please check the health of your Ceph cluster "
1585 "and restart the manila share service.")
1586 raise exception.ShareBackendException(msg=msg)
1588 export_ips = []
1589 for ip in set(ceph_nfs_export_ips):
1590 export_ips.append({'ip': ip, 'preferred': True})
1592 # It's possible for deployers to state additional
1593 # NFS interfaces directly via manila.conf. If they do,
1594 # these are represented as non-preferred export paths.
1595 # This is mostly to allow NFS-Ganesha server migrations.
1596 ganesha_export_ips = (eip for eip in ganesha_export_ips
1597 if eip not in ceph_nfs_export_ips)
1598 for ip in ganesha_export_ips:
1599 export_ips.append({'ip': ip, 'preferred': False})
1601 return export_ips
1603 def check_for_setup_error(self):
1604 """Returns an error if prerequisites aren't met."""
1605 return
1607 def _get_export_config(self, share, access, sub_name=None):
1608 """Returns export configuration in JSON-encoded bytes."""
1609 pseudo_path = self._get_export_pseudo_path(share, sub_name=sub_name)
1610 argdict = {
1611 "cluster_id": self.nfs_clusterid,
1612 "pseudo_path": pseudo_path
1613 }
1614 export = rados_command(
1615 self.rados_client, "nfs export info", argdict, json_obj=True)
1616 if export:
1617 export["clients"] = access
1618 else:
1619 export = {
1620 "path": self._get_export_path(share, sub_name=sub_name),
1621 "cluster_id": self.nfs_clusterid,
1622 "pseudo": pseudo_path,
1623 "squash": "none",
1624 "security_label": True,
1625 "fsal": {
1626 "name": "CEPH",
1627 "fs_name": self.volname,
1629 },
1630 "clients": access
1631 }
1632 return json.dumps(export).encode('utf-8')
1634 def _allow_access(self, share, access, sub_name=None):
1635 """Allow access to the share."""
1636 argdict = {
1637 "cluster_id": self.nfs_clusterid,
1638 }
1639 inbuf = self._get_export_config(share, access, sub_name)
1640 rados_command(self.rados_client,
1641 "nfs export apply", argdict, inbuf=inbuf)
1643 def _deny_access(self, share, sub_name=None):
1644 """Deny access to the share."""
1646 argdict = {
1647 "cluster_id": self.nfs_clusterid,
1648 "pseudo_path": self._get_export_pseudo_path(
1649 share, sub_name=sub_name)
1650 }
1652 rados_command(self.rados_client, "nfs export rm", argdict)
1654 def update_access(self, context, share, access_rules, add_rules,
1655 delete_rules, update_rules, share_server=None,
1656 sub_name=None):
1657 """Update access rules of share.
1659 Creates an export per share. Modifies access rules of shares by
1660 dynamically updating exports via ceph nfs.
1661 """
1662 rule_state_map = {}
1664 wanted_rw_clients, wanted_ro_clients = [], []
1665 for rule in access_rules:
1666 try:
1667 ganesha_utils.validate_access_rule(
1668 self.supported_access_types, self.supported_access_levels,
1669 rule, True)
1670 except (exception.InvalidShareAccess,
1671 exception.InvalidShareAccessLevel):
1672 rule_state_map[rule['id']] = {'state': 'error'}
1673 continue
1675 rule = ganesha_utils.fixup_access_rule(rule)
1676 if rule['access_level'] == 'rw':
1677 wanted_rw_clients.append(rule['access_to'])
1678 elif rule['access_level'] == 'ro':
1679 wanted_ro_clients.append(rule['access_to'])
1681 if access_rules:
1682 # add or update export
1683 clients = []
1684 if wanted_ro_clients:
1685 clients.append({
1686 'access_type': 'ro',
1687 'addresses': wanted_ro_clients,
1688 'squash': 'none'
1689 })
1690 if wanted_rw_clients:
1691 clients.append({
1692 'access_type': 'rw',
1693 'addresses': wanted_rw_clients,
1694 'squash': 'none'
1695 })
1697 if clients: # empty list if no rules passed validation
1698 self._allow_access(share, clients, sub_name=sub_name)
1699 else:
1700 # no clients have access to the share. remove export
1701 self._deny_access(share, sub_name=sub_name)
1703 return rule_state_map
1705 def get_backend_info(self, context):
1706 backend_info = {
1707 "cephfs_ganesha_export_ips":
1708 self.configuration.cephfs_ganesha_export_ips,
1709 "cephfs_ganesha_server_ip":
1710 self.configuration.cephfs_ganesha_server_ip,
1711 "cephfs_nfs_cluster_id": self.nfs_clusterid,
1712 "cephfs_ensure_all_shares_salt":
1713 self.configuration.cephfs_ensure_all_shares_salt,
1714 }
1715 return backend_info