Coverage for manila/share/drivers/zfsonlinux/driver.py: 99%
687 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2016 Mirantis Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16"""
17Module with ZFSonLinux share driver that utilizes ZFS filesystem resources
18and exports them as shares.
19"""
21import math
22import os
23import time
25from oslo_config import cfg
26from oslo_log import log
27from oslo_utils import importutils
28from oslo_utils import strutils
29from oslo_utils import timeutils
31from manila.common import constants
32from manila import exception
33from manila.i18n import _
34from manila.share import configuration
35from manila.share import driver
36from manila.share.drivers.zfsonlinux import utils as zfs_utils
37from manila.share.manager import share_manager_opts # noqa
38from manila.share import share_types
39from manila.share import utils as share_utils
40from manila import utils
43zfsonlinux_opts = [
44 cfg.HostAddressOpt(
45 "zfs_share_export_ip",
46 required=True,
47 help="IP to be added to user-facing export location. Required."),
48 cfg.HostAddressOpt(
49 "zfs_service_ip",
50 required=True,
51 help="IP to be added to admin-facing export location. Required."),
52 cfg.ListOpt(
53 "zfs_zpool_list",
54 required=True,
55 help="Specify list of zpools that are allowed to be used by backend. "
56 "Can contain nested datasets. Examples: "
57 "Without nested dataset: 'zpool_name'. "
58 "With nested dataset: 'zpool_name/nested_dataset_name'. "
59 "Required."),
60 cfg.ListOpt(
61 "zfs_dataset_creation_options",
62 help="Define here list of options that should be applied "
63 "for each dataset creation if needed. Example: "
64 "compression=gzip,dedup=off. "
65 "Note that, for secondary replicas option 'readonly' will be set "
66 "to 'on' and for active replicas to 'off' in any way. "
67 "Also, 'quota' will be equal to share size. Optional."),
68 cfg.StrOpt(
69 "zfs_dataset_name_prefix",
70 default='manila_share_',
71 help="Prefix to be used in each dataset name. Optional."),
72 cfg.StrOpt(
73 "zfs_dataset_snapshot_name_prefix",
74 default='manila_share_snapshot_',
75 help="Prefix to be used in each dataset snapshot name. Optional."),
76 cfg.BoolOpt(
77 "zfs_use_ssh",
78 default=False,
79 help="Remote ZFS storage hostname that should be used for SSH'ing. "
80 "Optional."),
81 cfg.StrOpt(
82 "zfs_ssh_username",
83 help="SSH user that will be used in 2 cases: "
84 "1) By manila-share service in case it is located on different "
85 "host than its ZFS storage. "
86 "2) By manila-share services with other ZFS backends that "
87 "perform replication. "
88 "It is expected that SSH'ing will be key-based, passwordless. "
89 "This user should be passwordless sudoer. Optional."),
90 cfg.StrOpt(
91 "zfs_ssh_user_password",
92 secret=True,
93 help="Password for user that is used for SSH'ing ZFS storage host. "
94 "Not used for replication operations. They require "
95 "passwordless SSH access. Optional."),
96 cfg.StrOpt(
97 "zfs_ssh_private_key_path",
98 help="Path to SSH private key that should be used for SSH'ing ZFS "
99 "storage host. Not used for replication operations. Optional."),
100 cfg.ListOpt(
101 "zfs_share_helpers",
102 default=[
103 "NFS=manila.share.drivers.zfsonlinux.utils.NFSviaZFSHelper",
104 ],
105 help="Specify list of share export helpers for ZFS storage. "
106 "It should look like following: "
107 "'FOO_protocol=foo.FooClass,BAR_protocol=bar.BarClass'. "
108 "Required."),
109 cfg.StrOpt(
110 "zfs_replica_snapshot_prefix",
111 default="tmp_snapshot_for_replication_",
112 help="Set snapshot prefix for usage in ZFS replication. Required."),
113 cfg.StrOpt(
114 "zfs_migration_snapshot_prefix",
115 default="tmp_snapshot_for_share_migration_",
116 help="Set snapshot prefix for usage in ZFS migration. Required."),
117]
119CONF = cfg.CONF
120CONF.register_opts(zfsonlinux_opts)
121LOG = log.getLogger(__name__)
124def ensure_share_server_not_provided(f):
126 def wrap(self, context, *args, **kwargs):
127 server = kwargs.get(
128 "share_server", kwargs.get("destination_share_server"))
129 if server:
130 raise exception.InvalidInput(
131 reason=_("Share server handling is not available. "
132 "But 'share_server' was provided. '%s'. "
133 "Share network should not be used.") % server.get(
134 "id", server))
135 return f(self, context, *args, **kwargs)
137 return wrap
140def get_backend_configuration(backend_name):
141 config_stanzas = CONF.list_all_sections()
142 if backend_name not in config_stanzas:
143 msg = _("Could not find backend stanza %(backend_name)s in "
144 "configuration which is required for share replication and "
145 "migration. Available stanzas are %(stanzas)s")
146 params = {
147 "stanzas": config_stanzas,
148 "backend_name": backend_name,
149 }
150 raise exception.BadConfigurationException(reason=msg % params)
152 config = configuration.Configuration(
153 driver.share_opts, config_group=backend_name)
154 config.append_config_values(zfsonlinux_opts)
155 config.append_config_values(share_manager_opts)
156 config.append_config_values(driver.ssh_opts)
158 return config
161class ZFSonLinuxShareDriver(zfs_utils.ExecuteMixin, driver.ShareDriver):
163 def __init__(self, *args, **kwargs):
164 super(ZFSonLinuxShareDriver, self).__init__(
165 [False], *args, config_opts=[zfsonlinux_opts], **kwargs)
166 self.replica_snapshot_prefix = (
167 self.configuration.zfs_replica_snapshot_prefix)
168 self.migration_snapshot_prefix = (
169 self.configuration.zfs_migration_snapshot_prefix)
170 self.backend_name = self.configuration.safe_get(
171 'share_backend_name') or 'ZFSonLinux'
172 self.zpool_list = self._get_zpool_list()
173 self.dataset_creation_options = (
174 self.configuration.zfs_dataset_creation_options)
175 self.share_export_ip = self.configuration.zfs_share_export_ip
176 self.service_ip = self.configuration.zfs_service_ip
177 self.private_storage = kwargs.get('private_storage')
178 self._helpers = {}
180 # Set config based capabilities
181 self._init_common_capabilities()
183 self._shell_executors = {}
185 def _get_shell_executor_by_host(self, host):
186 backend_name = share_utils.extract_host(host, level='backend_name')
187 if backend_name in CONF.enabled_share_backends:
188 # Return executor of this host
189 return self.execute
190 elif backend_name not in self._shell_executors:
191 config = get_backend_configuration(backend_name)
192 self._shell_executors[backend_name] = (
193 zfs_utils.get_remote_shell_executor(
194 ip=config.zfs_service_ip,
195 port=22,
196 conn_timeout=config.ssh_conn_timeout,
197 login=config.zfs_ssh_username,
198 password=config.zfs_ssh_user_password,
199 privatekey=config.zfs_ssh_private_key_path,
200 max_size=10,
201 )
202 )
203 # Return executor of remote host
204 return self._shell_executors[backend_name]
206 def _init_common_capabilities(self):
207 self.common_capabilities = {}
208 if 'dedup=on' in self.dataset_creation_options:
209 self.common_capabilities['dedupe'] = [True]
210 elif 'dedup=off' in self.dataset_creation_options:
211 self.common_capabilities['dedupe'] = [False]
212 else:
213 self.common_capabilities['dedupe'] = [True, False]
215 if 'compression=off' in self.dataset_creation_options:
216 self.common_capabilities['compression'] = [False]
217 elif any('compression=' in option
218 for option in self.dataset_creation_options):
219 self.common_capabilities['compression'] = [True]
220 else:
221 self.common_capabilities['compression'] = [True, False]
223 # NOTE(vponomaryov): Driver uses 'quota' approach for
224 # ZFS dataset. So, we can consider it as
225 # 'always thin provisioned' because this driver never reserves
226 # space for dataset.
227 self.common_capabilities['thin_provisioning'] = [True]
228 self.common_capabilities['max_over_subscription_ratio'] = (
229 self.configuration.max_over_subscription_ratio)
230 self.common_capabilities['qos'] = [False]
232 def _get_zpool_list(self):
233 zpools = []
234 for zpool in self.configuration.zfs_zpool_list:
235 zpool_name = zpool.split('/')[0]
236 if zpool_name in zpools:
237 raise exception.BadConfigurationException(
238 reason=_("Using the same zpool twice is prohibited. "
239 "Duplicate is '%(zpool)s'. List of zpools: "
240 "%(zpool_list)s.") % {
241 'zpool': zpool,
242 'zpool_list': ', '.join(
243 self.configuration.zfs_zpool_list)})
244 zpools.append(zpool_name)
245 return zpools
247 @zfs_utils.zfs_dataset_synchronized
248 def _delete_dataset_or_snapshot_with_retry(self, name):
249 """Attempts to destroy some dataset or snapshot with retries."""
250 # NOTE(vponomaryov): it is possible to see 'dataset is busy' error
251 # under the load. So, we are ok to perform retry in this case.
252 mountpoint = self.get_zfs_option(name, 'mountpoint')
253 if '@' not in name:
254 # NOTE(vponomaryov): check that dataset has no open files.
255 start_point = time.time()
256 while time.time() - start_point < 60:
257 try:
258 out, err = self.execute('lsof', '-w', mountpoint)
259 except exception.ProcessExecutionError:
260 # NOTE(vponomaryov): lsof returns code 1 if search
261 # didn't give results.
262 break
263 LOG.debug("Cannot destroy dataset '%(name)s', it has "
264 "opened files. Will wait 2 more seconds. "
265 "Out: \n%(out)s", {
266 'name': name, 'out': out})
267 time.sleep(2)
268 else:
269 raise exception.ZFSonLinuxException(
270 msg=_("Could not destroy '%s' dataset, "
271 "because it had opened files.") % name)
273 @utils.retry(retry_param=exception.ProcessExecutionError, retries=10)
274 def _zfs_destroy_with_retry():
275 """Retry destroying dataset ten times with exponential backoff."""
276 # NOTE(bswartz): There appears to be a bug in ZFS when creating and
277 # destroying datasets concurrently where the filesystem remains
278 # mounted even though ZFS thinks it's unmounted. The most reliable
279 # workaround I've found is to force the unmount, then attempt the
280 # destroy, with short pauses around the unmount. (See bug#1546723)
281 try:
282 self.execute('sudo', 'umount', mountpoint)
283 except exception.ProcessExecutionError:
284 # Ignore failed umount, it's normal
285 pass
286 time.sleep(2)
288 # NOTE(vponomaryov): Now, when no file usages and mounts of dataset
289 # exist, destroy dataset.
290 self.zfs('destroy', '-f', name)
292 _zfs_destroy_with_retry()
294 def _setup_helpers(self):
295 """Setups share helper for ZFS backend."""
296 self._helpers = {}
297 helpers = self.configuration.zfs_share_helpers
298 if helpers:
299 for helper_str in helpers:
300 share_proto, __, import_str = helper_str.partition('=')
301 helper = importutils.import_class(import_str)
302 self._helpers[share_proto.upper()] = helper(
303 self.configuration)
304 else:
305 raise exception.BadConfigurationException(
306 reason=_(
307 "No share helpers selected for ZFSonLinux Driver. "
308 "Please specify using config option 'zfs_share_helpers'."))
310 def _get_share_helper(self, share_proto):
311 """Returns share helper specific for used share protocol."""
312 helper = self._helpers.get(share_proto)
313 if helper:
314 return helper
315 else:
316 raise exception.InvalidShare(
317 reason=_("Wrong, unsupported or disabled protocol - "
318 "'%s'.") % share_proto)
320 def do_setup(self, context):
321 """Perform basic setup and checks."""
322 super(ZFSonLinuxShareDriver, self).do_setup(context)
323 self._setup_helpers()
324 for ip in (self.share_export_ip, self.service_ip):
325 if not utils.is_valid_ip_address(ip, 4):
326 raise exception.BadConfigurationException(
327 reason=_("Wrong IP address provided: "
328 "%s") % self.share_export_ip)
330 if not self.zpool_list:
331 raise exception.BadConfigurationException(
332 reason=_("No zpools specified for usage: "
333 "%s") % self.zpool_list)
335 # Make pool mounts shared so that cloned namespaces receive unmounts
336 # and don't prevent us from unmounting datasets
337 for zpool in self.configuration.zfs_zpool_list:
338 self.execute('sudo', 'mount', '--make-rshared', ('/%s' % zpool))
340 if self.configuration.zfs_use_ssh:
341 # Check workability of SSH executor
342 self.ssh_executor('whoami')
344 def _get_pools_info(self):
345 """Returns info about all pools used by backend."""
346 pools = []
347 for zpool in self.zpool_list:
348 free_size = self.get_zpool_option(zpool, 'free')
349 free_size = utils.translate_string_size_to_float(free_size)
350 total_size = self.get_zpool_option(zpool, 'size')
351 total_size = utils.translate_string_size_to_float(total_size)
352 pool = {
353 'pool_name': zpool,
354 'total_capacity_gb': float(total_size),
355 'free_capacity_gb': float(free_size),
356 'reserved_percentage':
357 self.configuration.reserved_share_percentage,
358 'reserved_snapshot_percentage': (
359 self.configuration.reserved_share_from_snapshot_percentage
360 or self.configuration.reserved_share_percentage),
361 'reserved_share_extend_percentage': (
362 self.configuration.reserved_share_extend_percentage
363 or self.configuration.reserved_share_percentage),
364 }
365 pool.update(self.common_capabilities)
366 if self.configuration.replication_domain:
367 pool['replication_type'] = 'readable'
368 pools.append(pool)
369 return pools
371 def _update_share_stats(self):
372 """Retrieves share stats info."""
373 data = {
374 'share_backend_name': self.backend_name,
375 'storage_protocol': 'NFS',
376 'reserved_percentage':
377 self.configuration.reserved_share_percentage,
378 'reserved_snapshot_percentage': (
379 self.configuration.reserved_share_from_snapshot_percentage
380 or self.configuration.reserved_share_percentage),
381 'reserved_share_extend_percentage': (
382 self.configuration.reserved_share_extend_percentage
383 or self.configuration.reserved_share_percentage),
384 'snapshot_support': True,
385 'create_share_from_snapshot_support': True,
386 'driver_name': 'ZFS',
387 'pools': self._get_pools_info(),
388 }
389 if self.configuration.replication_domain:
390 data['replication_type'] = 'readable'
391 super(ZFSonLinuxShareDriver, self)._update_share_stats(data)
393 def _get_share_name(self, share_id):
394 """Returns name of dataset used for given share."""
395 prefix = self.configuration.zfs_dataset_name_prefix or ''
396 return prefix + share_id.replace('-', '_')
398 def _get_snapshot_name(self, snapshot_id):
399 """Returns name of dataset snapshot used for given share snapshot."""
400 prefix = self.configuration.zfs_dataset_snapshot_name_prefix or ''
401 return prefix + snapshot_id.replace('-', '_')
403 def _get_dataset_creation_options(self, share, is_readonly=False):
404 """Returns list of options to be used for dataset creation."""
405 options = ['quota=%sG' % share['size']]
406 extra_specs = share_types.get_extra_specs_from_share(share)
408 dedupe_set = False
409 dedupe = extra_specs.get('dedupe')
410 if dedupe:
411 dedupe = strutils.bool_from_string(
412 dedupe.lower().split(' ')[-1], default=dedupe)
413 if (dedupe in self.common_capabilities['dedupe']):
414 options.append('dedup=%s' % ('on' if dedupe else 'off'))
415 dedupe_set = True
416 else:
417 raise exception.ZFSonLinuxException(msg=_(
418 "Cannot use requested '%(requested)s' value of 'dedupe' "
419 "extra spec. It does not fit allowed value '%(allowed)s' "
420 "that is configured for backend.") % {
421 'requested': dedupe,
422 'allowed': self.common_capabilities['dedupe']})
424 compression_set = False
425 compression_type = extra_specs.get('zfsonlinux:compression')
426 if compression_type:
427 if (compression_type == 'off' and
428 False in self.common_capabilities['compression']):
429 options.append('compression=off')
430 compression_set = True
431 elif (compression_type != 'off' and
432 True in self.common_capabilities['compression']):
433 options.append('compression=%s' % compression_type)
434 compression_set = True
435 else:
436 raise exception.ZFSonLinuxException(msg=_(
437 "Cannot use value '%s' of extra spec "
438 "'zfsonlinux:compression' because compression is disabled "
439 "for this backend. Set extra spec 'compression=True' to "
440 "make scheduler pick up appropriate backend."
441 ) % compression_type)
443 for option in self.dataset_creation_options or []:
444 if any(v in option for v in (
445 'readonly', 'sharenfs', 'sharesmb', 'quota')):
446 continue
447 if 'dedup' in option and dedupe_set is True:
448 continue
449 if 'compression' in option and compression_set is True:
450 continue
451 options.append(option)
452 if is_readonly:
453 options.append('readonly=on')
454 else:
455 options.append('readonly=off')
456 return options
458 def _get_dataset_name(self, share):
459 """Returns name of dataset used for given share."""
460 pool_name = share_utils.extract_host(share['host'], level='pool')
462 # Pick pool with nested dataset name if set up
463 for pool in self.configuration.zfs_zpool_list:
464 pool_data = pool.split('/')
465 if (pool_name == pool_data[0] and len(pool_data) > 1):
466 pool_name = pool
467 if pool_name[-1] == '/':
468 pool_name = pool_name[0:-1]
469 break
471 dataset_name = self._get_share_name(share['id'])
472 full_dataset_name = '%(pool)s/%(dataset)s' % {
473 'pool': pool_name, 'dataset': dataset_name}
475 return full_dataset_name
477 @ensure_share_server_not_provided
478 def create_share(self, context, share, share_server=None):
479 """Is called to create a share."""
480 options = self._get_dataset_creation_options(share, is_readonly=False)
481 cmd = ['create']
482 for option in options:
483 cmd.extend(['-o', option])
484 dataset_name = self._get_dataset_name(share)
485 cmd.append(dataset_name)
487 ssh_cmd = '%(username)s@%(host)s' % {
488 'username': self.configuration.zfs_ssh_username,
489 'host': self.service_ip,
490 }
491 pool_name = share_utils.extract_host(share['host'], level='pool')
492 self.private_storage.update(
493 share['id'], {
494 'entity_type': 'share',
495 'dataset_name': dataset_name,
496 'ssh_cmd': ssh_cmd, # used with replication and migration
497 'pool_name': pool_name, # used in replication
498 'used_options': ' '.join(options),
499 }
500 )
502 self.zfs(*cmd)
504 return self._get_share_helper(
505 share['share_proto']).create_exports(dataset_name)
507 @ensure_share_server_not_provided
508 def delete_share(self, context, share, share_server=None):
509 """Is called to remove a share."""
510 pool_name = self.private_storage.get(share['id'], 'pool_name')
511 pool_name = pool_name or share_utils.extract_host(
512 share["host"], level="pool")
513 dataset_name = self.private_storage.get(share['id'], 'dataset_name')
514 if not dataset_name:
515 dataset_name = self._get_dataset_name(share)
517 out, err = self.zfs('list', '-r', pool_name)
518 data = self.parse_zfs_answer(out)
519 for datum in data:
520 if datum['NAME'] != dataset_name:
521 continue
523 # Delete dataset's snapshots first
524 out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
525 snapshots = self.parse_zfs_answer(out)
526 full_snapshot_prefix = (
527 dataset_name + '@')
528 for snap in snapshots:
529 if full_snapshot_prefix in snap['NAME']:
530 self._delete_dataset_or_snapshot_with_retry(snap['NAME'])
532 self._get_share_helper(
533 share['share_proto']).remove_exports(dataset_name)
534 self._delete_dataset_or_snapshot_with_retry(dataset_name)
535 break
536 else:
537 LOG.warning(
538 "Share with '%(id)s' ID and '%(name)s' NAME is "
539 "absent on backend. Nothing has been deleted.",
540 {'id': share['id'], 'name': dataset_name})
541 self.private_storage.delete(share['id'])
543 @ensure_share_server_not_provided
544 def create_snapshot(self, context, snapshot, share_server=None):
545 """Is called to create a snapshot."""
546 dataset_name = self.private_storage.get(
547 snapshot['share_instance_id'], 'dataset_name')
548 snapshot_tag = self._get_snapshot_name(snapshot['id'])
549 snapshot_name = dataset_name + '@' + snapshot_tag
550 self.private_storage.update(
551 snapshot['snapshot_id'], {
552 'entity_type': 'snapshot',
553 'snapshot_tag': snapshot_tag,
554 }
555 )
556 self.zfs('snapshot', snapshot_name)
557 return {"provider_location": snapshot_name}
559 @ensure_share_server_not_provided
560 def delete_snapshot(self, context, snapshot, share_server=None):
561 """Is called to remove a snapshot."""
562 self._delete_snapshot(context, snapshot)
563 self.private_storage.delete(snapshot['snapshot_id'])
565 def _get_saved_snapshot_name(self, snapshot_instance):
566 snapshot_tag = self.private_storage.get(
567 snapshot_instance['snapshot_id'], 'snapshot_tag')
568 dataset_name = self.private_storage.get(
569 snapshot_instance['share_instance_id'], 'dataset_name')
570 snapshot_name = dataset_name + '@' + snapshot_tag
571 return snapshot_name
573 def _delete_snapshot(self, context, snapshot):
574 snapshot_name = self._get_saved_snapshot_name(snapshot)
575 out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
576 data = self.parse_zfs_answer(out)
577 for datum in data:
578 if datum['NAME'] == snapshot_name:
579 self._delete_dataset_or_snapshot_with_retry(snapshot_name)
580 break
581 else:
582 LOG.warning(
583 "Snapshot with '%(id)s' ID and '%(name)s' NAME is "
584 "absent on backend. Nothing has been deleted.",
585 {'id': snapshot['id'], 'name': snapshot_name})
587 @ensure_share_server_not_provided
588 def create_share_from_snapshot(self, context, share, snapshot,
589 share_server=None, parent_share=None):
590 """Is called to create a share from snapshot."""
591 src_backend_name = share_utils.extract_host(
592 snapshot.share_instance['host'], level='backend_name'
593 )
594 src_snapshot_name = self._get_saved_snapshot_name(snapshot)
595 dataset_name = self._get_dataset_name(share)
597 dst_backend_ssh_cmd = '%(username)s@%(host)s' % {
598 'username': self.configuration.zfs_ssh_username,
599 'host': self.service_ip,
600 }
602 dst_backend_pool_name = share_utils.extract_host(share['host'],
603 level='pool')
604 options = self._get_dataset_creation_options(share, is_readonly=False)
606 self.private_storage.update(
607 share['id'], {
608 'entity_type': 'share',
609 'dataset_name': dataset_name,
610 'ssh_cmd': dst_backend_ssh_cmd, # used in replication
611 'pool_name': dst_backend_pool_name, # used in replication
612 'used_options': options,
613 }
614 )
616 # NOTE(andrebeltrami): Implementing the support for create share
617 # from snapshot in different backends in different hosts
618 src_config = get_backend_configuration(src_backend_name)
619 src_backend_ssh_cmd = '%(username)s@%(host)s' % {
620 'username': src_config.zfs_ssh_username,
621 'host': src_config.zfs_service_ip,
622 }
623 self.execute(
624 # NOTE(vponomaryov): SSH is used as workaround for 'execute'
625 # implementation restriction that does not support usage
626 # of '|'.
627 'ssh', src_backend_ssh_cmd,
628 'sudo', 'zfs', 'send', '-vD', src_snapshot_name, '|',
629 'ssh', dst_backend_ssh_cmd,
630 'sudo', 'zfs', 'receive', '-v', dataset_name,
631 )
633 # Apply options based on used share type that may differ from
634 # one used for original share.
635 for option in options:
636 self.zfs('set', option, dataset_name)
638 # Delete with retry as right after creation it may be temporary busy.
639 self.execute_with_retry(
640 'sudo', 'zfs', 'destroy',
641 dataset_name + '@' + src_snapshot_name.split('@')[-1])
643 return self._get_share_helper(
644 share['share_proto']).create_exports(dataset_name)
646 def get_pool(self, share):
647 """Return pool name where the share resides on.
649 :param share: The share hosted by the driver.
650 """
651 pool_name = share_utils.extract_host(share['host'], level='pool')
652 return pool_name
654 @ensure_share_server_not_provided
655 def ensure_share(self, context, share, share_server=None):
656 """Invoked to ensure that given share is exported."""
657 dataset_name = self.private_storage.get(share['id'], 'dataset_name')
658 if not dataset_name:
659 dataset_name = self._get_dataset_name(share)
661 pool_name = share_utils.extract_host(share['host'], level='pool')
662 out, err = self.zfs('list', '-r', pool_name)
663 data = self.parse_zfs_answer(out)
664 for datum in data:
665 if datum['NAME'] == dataset_name:
666 ssh_cmd = '%(username)s@%(host)s' % {
667 'username': self.configuration.zfs_ssh_username,
668 'host': self.service_ip,
669 }
670 self.private_storage.update(
671 share['id'], {'ssh_cmd': ssh_cmd})
672 sharenfs = self.get_zfs_option(dataset_name, 'sharenfs')
673 if sharenfs != 'off':
674 self.zfs('share', dataset_name)
675 export_locations = self._get_share_helper(
676 share['share_proto']).get_exports(dataset_name)
677 return export_locations
678 else:
679 raise exception.ShareResourceNotFound(share_id=share['id'])
681 def get_network_allocations_number(self):
682 """ZFS does not handle networking. Return 0."""
683 return 0
685 @ensure_share_server_not_provided
686 def extend_share(self, share, new_size, share_server=None):
687 """Extends size of existing share."""
688 dataset_name = self._get_dataset_name(share)
689 self.zfs('set', 'quota=%sG' % new_size, dataset_name)
691 @ensure_share_server_not_provided
692 def shrink_share(self, share, new_size, share_server=None):
693 """Shrinks size of existing share."""
694 dataset_name = self._get_dataset_name(share)
695 consumed_space = self.get_zfs_option(dataset_name, 'used')
696 consumed_space = utils.translate_string_size_to_float(consumed_space)
697 if consumed_space >= new_size:
698 raise exception.ShareShrinkingPossibleDataLoss(
699 share_id=share['id'])
700 self.zfs('set', 'quota=%sG' % new_size, dataset_name)
702 @ensure_share_server_not_provided
703 def update_access(self, context, share, access_rules, add_rules,
704 delete_rules, update_rules, share_server=None):
705 """Updates access rules for given share."""
706 dataset_name = self._get_dataset_name(share)
707 executor = self._get_shell_executor_by_host(share['host'])
708 return self._get_share_helper(share['share_proto']).update_access(
709 dataset_name, access_rules, add_rules, delete_rules,
710 executor=executor)
712 def manage_existing(self, share, driver_options):
713 """Manage existing ZFS dataset as manila share.
715 ZFSonLinux driver accepts only one driver_option 'size'.
716 If an administrator provides this option, then such quota will be set
717 to dataset and used as share size. Otherwise, driver will set quota
718 equal to nearest bigger rounded integer of usage size.
719 Driver does not expect mountpoint to be changed (should be equal
720 to default that is "/%(dataset_name)s").
722 :param share: share data
723 :param driver_options: Empty dict or dict with 'size' option.
724 :return: dict with share size and its export locations.
725 """
726 old_export_location = share["export_locations"][0]["path"]
727 old_dataset_name = old_export_location.split(":/")[-1]
729 scheduled_pool_name = share_utils.extract_host(
730 share["host"], level="pool")
731 actual_pool_name = old_dataset_name.split("/")[0]
733 new_dataset_name = self._get_dataset_name(share)
735 # Calculate quota for managed dataset
736 quota = driver_options.get("size")
737 if not quota:
738 consumed_space = self.get_zfs_option(old_dataset_name, "used")
739 consumed_space = utils.translate_string_size_to_float(
740 consumed_space)
741 quota = int(consumed_space) + 1
742 share["size"] = int(quota)
744 # Save dataset-specific data in private storage
745 options = self._get_dataset_creation_options(share, is_readonly=False)
746 ssh_cmd = "%(username)s@%(host)s" % {
747 "username": self.configuration.zfs_ssh_username,
748 "host": self.service_ip,
749 }
751 # Perform checks on requested dataset
752 if actual_pool_name != scheduled_pool_name:
753 raise exception.ZFSonLinuxException(
754 _("Cannot manage share '%(share_id)s' "
755 "(share_instance '%(si_id)s'), because scheduled "
756 "pool '%(sch)s' and actual '%(actual)s' differ.") % {
757 "share_id": share["share_id"],
758 "si_id": share["id"],
759 "sch": scheduled_pool_name,
760 "actual": actual_pool_name})
762 out, err = self.zfs("list", "-r", actual_pool_name)
763 data = self.parse_zfs_answer(out)
764 for datum in data:
765 if datum["NAME"] == old_dataset_name:
766 break
767 else:
768 raise exception.ZFSonLinuxException(
769 _("Cannot manage share '%(share_id)s' "
770 "(share_instance '%(si_id)s'), because dataset "
771 "'%(dataset)s' not found in zpool '%(zpool)s'.") % {
772 "share_id": share["share_id"],
773 "si_id": share["id"],
774 "dataset": old_dataset_name,
775 "zpool": actual_pool_name})
777 # Unmount the dataset before attempting to rename and mount
778 try:
779 self._unmount_share_with_retry(old_dataset_name)
780 except exception.ZFSonLinuxException:
781 msg = _("Unable to unmount share before renaming and re-mounting.")
782 raise exception.ZFSonLinuxException(message=msg)
784 # Rename the dataset and mount with new name
785 self.zfs_with_retry("rename", old_dataset_name, new_dataset_name)
787 try:
788 self.zfs("mount", new_dataset_name)
789 except exception.ProcessExecutionError:
790 # Workaround for bug/1785180
791 out, err = self.zfs("mount")
792 mounted = any([new_dataset_name in mountedfs
793 for mountedfs in out.splitlines()])
794 if not mounted:
795 raise
797 # Apply options to dataset
798 for option in options:
799 self.zfs("set", option, new_dataset_name)
801 # Get new export locations of renamed dataset
802 export_locations = self._get_share_helper(
803 share["share_proto"]).get_exports(new_dataset_name)
805 self.private_storage.update(
806 share["id"], {
807 "entity_type": "share",
808 "dataset_name": new_dataset_name,
809 "ssh_cmd": ssh_cmd, # used in replication
810 "pool_name": actual_pool_name, # used in replication
811 "used_options": " ".join(options),
812 }
813 )
815 return {"size": share["size"], "export_locations": export_locations}
817 def unmanage(self, share):
818 """Removes the specified share from Manila management."""
819 self.private_storage.delete(share['id'])
821 def manage_existing_snapshot(self, snapshot_instance, driver_options):
822 """Manage existing share snapshot with manila.
824 :param snapshot_instance: SnapshotInstance data
825 :param driver_options: expects only one optional key 'size'.
826 :return: dict with share snapshot instance fields for update, example::
828 {
830 'size': 1,
831 'provider_location': 'path/to/some/dataset@some_snapshot_tag',
833 }
835 """
836 snapshot_size = int(driver_options.get("size", 0))
837 old_provider_location = snapshot_instance.get("provider_location")
838 old_snapshot_tag = old_provider_location.split("@")[-1]
839 new_snapshot_tag = self._get_snapshot_name(snapshot_instance["id"])
841 self.private_storage.update(
842 snapshot_instance["snapshot_id"], {
843 "entity_type": "snapshot",
844 "old_snapshot_tag": old_snapshot_tag,
845 "snapshot_tag": new_snapshot_tag,
846 }
847 )
849 try:
850 self.zfs("list", "-r", "-t", "snapshot", old_provider_location)
851 except exception.ProcessExecutionError as e:
852 raise exception.ManageInvalidShareSnapshot(reason=e.stderr)
854 if not snapshot_size:
855 consumed_space = self.get_zfs_option(old_provider_location, "used")
856 consumed_space = utils.translate_string_size_to_float(
857 consumed_space)
858 snapshot_size = int(math.ceil(consumed_space))
860 dataset_name = self.private_storage.get(
861 snapshot_instance["share_instance_id"], "dataset_name")
862 new_provider_location = dataset_name + "@" + new_snapshot_tag
864 self.zfs("rename", old_provider_location, new_provider_location)
866 return {
867 "size": snapshot_size,
868 "provider_location": new_provider_location,
869 }
871 def unmanage_snapshot(self, snapshot_instance):
872 """Unmanage dataset snapshot."""
873 self.private_storage.delete(snapshot_instance["snapshot_id"])
875 @utils.retry(retry_param=exception.ZFSonLinuxException, retries=10)
876 def _unmount_share_with_retry(self, share_name):
877 out, err = self.execute("sudo", "mount")
878 if "%s " % share_name not in out:
879 return
880 self.zfs_with_retry("umount", "-f", share_name)
881 out, err = self.execute("sudo", "mount")
882 if "%s " % share_name in out:
883 raise exception.ZFSonLinuxException(
884 _("Unable to unmount dataset %s"), share_name)
886 def _get_replication_snapshot_prefix(self, replica):
887 """Returns replica-based snapshot prefix."""
888 replication_snapshot_prefix = "%s_%s" % (
889 self.replica_snapshot_prefix, replica['id'].replace('-', '_'))
890 return replication_snapshot_prefix
892 def _get_replication_snapshot_tag(self, replica):
893 """Returns replica- and time-based snapshot tag."""
894 current_time = timeutils.utcnow().isoformat()
895 snapshot_tag = "%s_time_%s" % (
896 self._get_replication_snapshot_prefix(replica), current_time)
897 return snapshot_tag
899 def _get_active_replica(self, replica_list):
900 for replica in replica_list:
901 if replica['replica_state'] == constants.REPLICA_STATE_ACTIVE:
902 return replica
903 msg = _("Active replica not found.")
904 raise exception.ReplicationException(reason=msg)
906 def _get_migration_snapshot_prefix(self, share_instance):
907 """Returns migration-based snapshot prefix."""
908 migration_snapshot_prefix = "%s_%s" % (
909 self.migration_snapshot_prefix,
910 share_instance['id'].replace('-', '_'))
911 return migration_snapshot_prefix
913 def _get_migration_snapshot_tag(self, share_instance):
914 """Returns migration- and time-based snapshot tag."""
915 current_time = timeutils.utcnow().isoformat()
916 snapshot_tag = "%s_time_%s" % (
917 self._get_migration_snapshot_prefix(share_instance), current_time)
918 snapshot_tag = (
919 snapshot_tag.replace('-', '_').replace('.', '_').replace(':', '_'))
920 return snapshot_tag
922 @ensure_share_server_not_provided
923 def create_replica(self, context, replica_list, new_replica,
924 access_rules, replica_snapshots, share_server=None):
925 """Replicates the active replica to a new replica on this backend."""
926 active_replica = self._get_active_replica(replica_list)
927 src_dataset_name = self.private_storage.get(
928 active_replica['id'], 'dataset_name')
929 ssh_to_src_cmd = self.private_storage.get(
930 active_replica['id'], 'ssh_cmd')
931 dst_dataset_name = self._get_dataset_name(new_replica)
933 ssh_cmd = '%(username)s@%(host)s' % {
934 'username': self.configuration.zfs_ssh_username,
935 'host': self.service_ip,
936 }
938 snapshot_tag = self._get_replication_snapshot_tag(new_replica)
939 src_snapshot_name = (
940 '%(dataset_name)s@%(snapshot_tag)s' % {
941 'snapshot_tag': snapshot_tag,
942 'dataset_name': src_dataset_name,
943 }
944 )
945 # Save valuable data to DB
946 self.private_storage.update(active_replica['id'], {
947 'repl_snapshot_tag': snapshot_tag,
948 })
949 self.private_storage.update(new_replica['id'], {
950 'entity_type': 'replica',
951 'replica_type': 'readable',
952 'dataset_name': dst_dataset_name,
953 'ssh_cmd': ssh_cmd,
954 'pool_name': share_utils.extract_host(
955 new_replica['host'], level='pool'),
956 'repl_snapshot_tag': snapshot_tag,
957 })
959 # Create temporary snapshot. It will exist until following replica sync
960 # After it - new one will appear and so in loop.
961 self.execute(
962 'ssh', ssh_to_src_cmd,
963 'sudo', 'zfs', 'snapshot', src_snapshot_name,
964 )
966 # Send/receive temporary snapshot
967 out, err = self.execute(
968 'ssh', ssh_to_src_cmd,
969 'sudo', 'zfs', 'send', '-vDR', src_snapshot_name, '|',
970 'ssh', ssh_cmd,
971 'sudo', 'zfs', 'receive', '-v', dst_dataset_name,
972 )
973 msg = ("Info about replica '%(replica_id)s' creation is following: "
974 "\n%(out)s")
975 LOG.debug(msg, {'replica_id': new_replica['id'], 'out': out})
977 # Make replica readonly
978 self.zfs('set', 'readonly=on', dst_dataset_name)
980 # Set original share size as quota to new replica
981 self.zfs('set', 'quota=%sG' % active_replica['size'], dst_dataset_name)
983 # Apply access rules from original share
984 self._get_share_helper(new_replica['share_proto']).update_access(
985 dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
986 make_all_ro=True)
988 return {
989 'export_locations': self._get_share_helper(
990 new_replica['share_proto']).create_exports(dst_dataset_name),
991 'replica_state': constants.REPLICA_STATE_IN_SYNC,
992 'access_rules_status': constants.STATUS_ACTIVE,
993 }
995 @ensure_share_server_not_provided
996 def delete_replica(self, context, replica_list, replica_snapshots, replica,
997 share_server=None):
998 """Deletes a replica. This is called on the destination backend."""
999 pool_name = self.private_storage.get(replica['id'], 'pool_name')
1000 dataset_name = self.private_storage.get(replica['id'], 'dataset_name')
1001 if not dataset_name:
1002 dataset_name = self._get_dataset_name(replica)
1004 # Delete dataset's snapshots first
1005 out, err = self.zfs('list', '-r', '-t', 'snapshot', pool_name)
1006 data = self.parse_zfs_answer(out)
1007 for datum in data:
1008 if dataset_name in datum['NAME']:
1009 self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
1011 # Now we delete dataset itself
1012 out, err = self.zfs('list', '-r', pool_name)
1013 data = self.parse_zfs_answer(out)
1014 for datum in data:
1015 if datum['NAME'] == dataset_name:
1016 self._get_share_helper(
1017 replica['share_proto']).remove_exports(dataset_name)
1018 self._delete_dataset_or_snapshot_with_retry(dataset_name)
1019 break
1020 else:
1021 LOG.warning(
1022 "Share replica with '%(id)s' ID and '%(name)s' NAME is "
1023 "absent on backend. Nothing has been deleted.",
1024 {'id': replica['id'], 'name': dataset_name})
1025 self.private_storage.delete(replica['id'])
1027 @ensure_share_server_not_provided
1028 def update_replica_state(self, context, replica_list, replica,
1029 access_rules, replica_snapshots,
1030 share_server=None):
1031 """Syncs replica and updates its 'replica_state'."""
1032 return self._update_replica_state(
1033 context, replica_list, replica, replica_snapshots, access_rules)
1035 def _update_replica_state(self, context, replica_list, replica,
1036 replica_snapshots=None, access_rules=None):
1037 active_replica = self._get_active_replica(replica_list)
1038 src_dataset_name = self.private_storage.get(
1039 active_replica['id'], 'dataset_name')
1040 ssh_to_src_cmd = self.private_storage.get(
1041 active_replica['id'], 'ssh_cmd')
1042 ssh_to_dst_cmd = self.private_storage.get(
1043 replica['id'], 'ssh_cmd')
1044 dst_dataset_name = self.private_storage.get(
1045 replica['id'], 'dataset_name')
1047 # Create temporary snapshot
1048 previous_snapshot_tag = self.private_storage.get(
1049 replica['id'], 'repl_snapshot_tag')
1050 snapshot_tag = self._get_replication_snapshot_tag(replica)
1051 src_snapshot_name = src_dataset_name + '@' + snapshot_tag
1052 self.execute(
1053 'ssh', ssh_to_src_cmd,
1054 'sudo', 'zfs', 'snapshot', src_snapshot_name,
1055 )
1057 # Make sure it is readonly
1058 self.zfs('set', 'readonly=on', dst_dataset_name)
1060 # Send/receive diff between previous snapshot and last one
1061 out, err = self.execute(
1062 'ssh', ssh_to_src_cmd,
1063 'sudo', 'zfs', 'send', '-vDRI',
1064 previous_snapshot_tag, src_snapshot_name, '|',
1065 'ssh', ssh_to_dst_cmd,
1066 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
1067 )
1068 msg = ("Info about last replica '%(replica_id)s' sync is following: "
1069 "\n%(out)s")
1070 LOG.debug(msg, {'replica_id': replica['id'], 'out': out})
1072 # Update DB data that will be used on following replica sync
1073 self.private_storage.update(active_replica['id'], {
1074 'repl_snapshot_tag': snapshot_tag,
1075 })
1076 self.private_storage.update(
1077 replica['id'], {'repl_snapshot_tag': snapshot_tag})
1079 # Destroy all snapshots on dst filesystem except referenced ones.
1080 snap_references = set()
1081 for repl in replica_list:
1082 snap_references.add(
1083 self.private_storage.get(repl['id'], 'repl_snapshot_tag'))
1085 dst_pool_name = dst_dataset_name.split('/')[0]
1086 out, err = self.zfs('list', '-r', '-t', 'snapshot', dst_pool_name)
1087 data = self.parse_zfs_answer(out)
1088 for datum in data:
1089 if (dst_dataset_name in datum['NAME'] and
1090 '@' + self.replica_snapshot_prefix in datum['NAME'] and
1091 datum['NAME'].split('@')[-1] not in snap_references):
1092 self._delete_dataset_or_snapshot_with_retry(datum['NAME'])
1094 # Destroy all snapshots on src filesystem except referenced ones.
1095 src_pool_name = src_snapshot_name.split('/')[0]
1096 out, err = self.execute(
1097 'ssh', ssh_to_src_cmd,
1098 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', src_pool_name,
1099 )
1100 data = self.parse_zfs_answer(out)
1101 full_src_snapshot_prefix = (
1102 src_dataset_name + '@' +
1103 self._get_replication_snapshot_prefix(replica))
1104 for datum in data:
1105 if (full_src_snapshot_prefix in datum['NAME'] and
1106 datum['NAME'].split('@')[-1] not in snap_references):
1107 self.execute_with_retry(
1108 'ssh', ssh_to_src_cmd,
1109 'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
1110 )
1112 if access_rules: 1112 ↛ 1123line 1112 didn't jump to line 1123 because the condition on line 1112 was always true
1113 # Apply access rules from original share
1114 # TODO(vponomaryov): we should remove somehow rules that were
1115 # deleted on active replica after creation of secondary replica.
1116 # For the moment there will be difference and it can be considered
1117 # as a bug.
1118 self._get_share_helper(replica['share_proto']).update_access(
1119 dst_dataset_name, access_rules, add_rules=[], delete_rules=[],
1120 make_all_ro=True)
1122 # Return results
1123 return constants.REPLICA_STATE_IN_SYNC
1125 @ensure_share_server_not_provided
1126 def promote_replica(self, context, replica_list, replica, access_rules,
1127 share_server=None, quiesce_wait_time=None):
1128 """Promotes secondary replica to active and active to secondary."""
1129 active_replica = self._get_active_replica(replica_list)
1130 src_dataset_name = self.private_storage.get(
1131 active_replica['id'], 'dataset_name')
1132 ssh_to_src_cmd = self.private_storage.get(
1133 active_replica['id'], 'ssh_cmd')
1134 dst_dataset_name = self.private_storage.get(
1135 replica['id'], 'dataset_name')
1136 replica_dict = {
1137 r['id']: {
1138 'id': r['id'],
1139 # NOTE(vponomaryov): access rules will be updated in next
1140 # 'sync' operation.
1141 'access_rules_status': constants.SHARE_INSTANCE_RULES_SYNCING,
1142 }
1143 for r in replica_list
1144 }
1145 try:
1146 # Mark currently active replica as readonly
1147 self.execute(
1148 'ssh', ssh_to_src_cmd,
1149 'set', 'readonly=on', src_dataset_name,
1150 )
1152 # Create temporary snapshot of currently active replica
1153 snapshot_tag = self._get_replication_snapshot_tag(active_replica)
1154 src_snapshot_name = src_dataset_name + '@' + snapshot_tag
1155 self.execute(
1156 'ssh', ssh_to_src_cmd,
1157 'sudo', 'zfs', 'snapshot', src_snapshot_name,
1158 )
1160 # Apply temporary snapshot to all replicas
1161 for repl in replica_list:
1162 if repl['replica_state'] == constants.REPLICA_STATE_ACTIVE:
1163 continue
1164 previous_snapshot_tag = self.private_storage.get(
1165 repl['id'], 'repl_snapshot_tag')
1166 dataset_name = self.private_storage.get(
1167 repl['id'], 'dataset_name')
1168 ssh_to_dst_cmd = self.private_storage.get(
1169 repl['id'], 'ssh_cmd')
1171 try:
1172 # Send/receive diff between previous snapshot and last one
1173 out, err = self.execute(
1174 'ssh', ssh_to_src_cmd,
1175 'sudo', 'zfs', 'send', '-vDRI',
1176 previous_snapshot_tag, src_snapshot_name, '|',
1177 'ssh', ssh_to_dst_cmd,
1178 'sudo', 'zfs', 'receive', '-vF', dataset_name,
1179 )
1180 except exception.ProcessExecutionError as e:
1181 LOG.warning("Failed to sync replica %(id)s. %(e)s",
1182 {'id': repl['id'], 'e': e})
1183 replica_dict[repl['id']]['replica_state'] = (
1184 constants.REPLICA_STATE_OUT_OF_SYNC)
1185 continue
1187 msg = ("Info about last replica '%(replica_id)s' "
1188 "sync is following: \n%(out)s")
1189 LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
1191 # Update latest replication snapshot for replica
1192 self.private_storage.update(
1193 repl['id'], {'repl_snapshot_tag': snapshot_tag})
1195 # Update latest replication snapshot for currently active replica
1196 self.private_storage.update(
1197 active_replica['id'], {'repl_snapshot_tag': snapshot_tag})
1199 replica_dict[active_replica['id']]['replica_state'] = (
1200 constants.REPLICA_STATE_IN_SYNC)
1201 except Exception as e:
1202 LOG.warning(
1203 "Failed to update currently active replica. \n%s", e)
1205 replica_dict[active_replica['id']]['replica_state'] = (
1206 constants.REPLICA_STATE_OUT_OF_SYNC)
1208 # Create temporary snapshot of new replica and sync it with other
1209 # secondary replicas.
1210 snapshot_tag = self._get_replication_snapshot_tag(replica)
1211 src_snapshot_name = dst_dataset_name + '@' + snapshot_tag
1212 ssh_to_src_cmd = self.private_storage.get(replica['id'], 'ssh_cmd')
1213 self.zfs('snapshot', src_snapshot_name)
1214 for repl in replica_list:
1215 if (repl['replica_state'] == constants.REPLICA_STATE_ACTIVE or
1216 repl['id'] == replica['id']):
1217 continue
1218 previous_snapshot_tag = self.private_storage.get(
1219 repl['id'], 'repl_snapshot_tag')
1220 dataset_name = self.private_storage.get(
1221 repl['id'], 'dataset_name')
1222 ssh_to_dst_cmd = self.private_storage.get(
1223 repl['id'], 'ssh_cmd')
1225 try:
1226 # Send/receive diff between previous snapshot and last one
1227 out, err = self.execute(
1228 'ssh', ssh_to_src_cmd,
1229 'sudo', 'zfs', 'send', '-vDRI',
1230 previous_snapshot_tag, src_snapshot_name, '|',
1231 'ssh', ssh_to_dst_cmd,
1232 'sudo', 'zfs', 'receive', '-vF', dataset_name,
1233 )
1234 except exception.ProcessExecutionError as e:
1235 LOG.warning("Failed to sync replica %(id)s. %(e)s",
1236 {'id': repl['id'], 'e': e})
1237 replica_dict[repl['id']]['replica_state'] = (
1238 constants.REPLICA_STATE_OUT_OF_SYNC)
1239 continue
1241 msg = ("Info about last replica '%(replica_id)s' "
1242 "sync is following: \n%(out)s")
1243 LOG.debug(msg, {'replica_id': repl['id'], 'out': out})
1245 # Update latest replication snapshot for replica
1246 self.private_storage.update(
1247 repl['id'], {'repl_snapshot_tag': snapshot_tag})
1249 # Update latest replication snapshot for new active replica
1250 self.private_storage.update(
1251 replica['id'], {'repl_snapshot_tag': snapshot_tag})
1253 replica_dict[replica['id']]['replica_state'] = (
1254 constants.REPLICA_STATE_ACTIVE)
1256 self._get_share_helper(replica['share_proto']).update_access(
1257 dst_dataset_name, access_rules, add_rules=[], delete_rules=[])
1259 replica_dict[replica['id']]['access_rules_status'] = (
1260 constants.STATUS_ACTIVE)
1262 self.zfs('set', 'readonly=off', dst_dataset_name)
1264 return list(replica_dict.values())
1266 @ensure_share_server_not_provided
1267 def create_replicated_snapshot(self, context, replica_list,
1268 replica_snapshots, share_server=None):
1269 """Create a snapshot and update across the replicas."""
1270 active_replica = self._get_active_replica(replica_list)
1271 src_dataset_name = self.private_storage.get(
1272 active_replica['id'], 'dataset_name')
1273 ssh_to_src_cmd = self.private_storage.get(
1274 active_replica['id'], 'ssh_cmd')
1275 replica_snapshots_dict = {
1276 si['id']: {'id': si['id']} for si in replica_snapshots}
1278 active_snapshot_instance_id = [
1279 si['id'] for si in replica_snapshots
1280 if si['share_instance_id'] == active_replica['id']][0]
1281 snapshot_tag = self._get_snapshot_name(active_snapshot_instance_id)
1282 # Replication should not be dependent on manually created snapshots
1283 # so, create additional one, newer, that will be used for replication
1284 # synchronizations.
1285 repl_snapshot_tag = self._get_replication_snapshot_tag(active_replica)
1286 src_snapshot_name = src_dataset_name + '@' + repl_snapshot_tag
1288 self.private_storage.update(
1289 replica_snapshots[0]['snapshot_id'], {
1290 'entity_type': 'snapshot',
1291 'snapshot_tag': snapshot_tag,
1292 }
1293 )
1294 for tag in (snapshot_tag, repl_snapshot_tag):
1295 self.execute(
1296 'ssh', ssh_to_src_cmd,
1297 'sudo', 'zfs', 'snapshot', src_dataset_name + '@' + tag,
1298 )
1300 # Populate snapshot to all replicas
1301 for replica_snapshot in replica_snapshots:
1302 replica_id = replica_snapshot['share_instance_id']
1303 if replica_id == active_replica['id']:
1304 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1305 constants.STATUS_AVAILABLE)
1306 continue
1307 previous_snapshot_tag = self.private_storage.get(
1308 replica_id, 'repl_snapshot_tag')
1309 dst_dataset_name = self.private_storage.get(
1310 replica_id, 'dataset_name')
1311 ssh_to_dst_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
1313 try:
1314 # Send/receive diff between previous snapshot and last one
1315 out, err = self.execute(
1316 'ssh', ssh_to_src_cmd,
1317 'sudo', 'zfs', 'send', '-vDRI',
1318 previous_snapshot_tag, src_snapshot_name, '|',
1319 'ssh', ssh_to_dst_cmd,
1320 'sudo', 'zfs', 'receive', '-vF', dst_dataset_name,
1321 )
1322 except exception.ProcessExecutionError as e:
1323 LOG.warning(
1324 "Failed to sync snapshot instance %(id)s. %(e)s",
1325 {'id': replica_snapshot['id'], 'e': e})
1326 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1327 constants.STATUS_ERROR)
1328 continue
1330 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1331 constants.STATUS_AVAILABLE)
1333 msg = ("Info about last replica '%(replica_id)s' "
1334 "sync is following: \n%(out)s")
1335 LOG.debug(msg, {'replica_id': replica_id, 'out': out})
1337 # Update latest replication snapshot for replica
1338 self.private_storage.update(
1339 replica_id, {'repl_snapshot_tag': repl_snapshot_tag})
1341 # Update latest replication snapshot for currently active replica
1342 self.private_storage.update(
1343 active_replica['id'], {'repl_snapshot_tag': repl_snapshot_tag})
1345 return list(replica_snapshots_dict.values())
1347 @ensure_share_server_not_provided
1348 def delete_replicated_snapshot(self, context, replica_list,
1349 replica_snapshots, share_server=None):
1350 """Delete a snapshot by deleting its instances across the replicas."""
1351 active_replica = self._get_active_replica(replica_list)
1352 replica_snapshots_dict = {
1353 si['id']: {'id': si['id']} for si in replica_snapshots}
1355 for replica_snapshot in replica_snapshots:
1356 replica_id = replica_snapshot['share_instance_id']
1357 snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
1358 if active_replica['id'] == replica_id:
1359 self._delete_snapshot(context, replica_snapshot)
1360 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1361 constants.STATUS_DELETED)
1362 continue
1363 ssh_cmd = self.private_storage.get(replica_id, 'ssh_cmd')
1364 out, err = self.execute(
1365 'ssh', ssh_cmd,
1366 'sudo', 'zfs', 'list', '-r', '-t', 'snapshot', snapshot_name,
1367 )
1368 data = self.parse_zfs_answer(out)
1369 for datum in data:
1370 if datum['NAME'] != snapshot_name: 1370 ↛ 1372line 1370 didn't jump to line 1372 because the condition on line 1370 was always true
1371 continue
1372 self.execute_with_retry(
1373 'ssh', ssh_cmd,
1374 'sudo', 'zfs', 'destroy', '-f', datum['NAME'],
1375 )
1377 self.private_storage.delete(replica_snapshot['id'])
1378 replica_snapshots_dict[replica_snapshot['id']]['status'] = (
1379 constants.STATUS_DELETED)
1381 self.private_storage.delete(replica_snapshot['snapshot_id'])
1382 return list(replica_snapshots_dict.values())
1384 @ensure_share_server_not_provided
1385 def update_replicated_snapshot(self, context, replica_list,
1386 share_replica, replica_snapshots,
1387 replica_snapshot, share_server=None):
1388 """Update the status of a snapshot instance that lives on a replica."""
1390 self._update_replica_state(context, replica_list, share_replica)
1392 snapshot_name = self._get_saved_snapshot_name(replica_snapshot)
1394 out, err = self.zfs('list', '-r', '-t', 'snapshot', snapshot_name)
1395 data = self.parse_zfs_answer(out)
1396 snapshot_found = False
1397 for datum in data:
1398 if datum['NAME'] == snapshot_name:
1399 snapshot_found = True
1400 break
1401 return_dict = {'id': replica_snapshot['id']}
1402 if snapshot_found:
1403 return_dict.update({'status': constants.STATUS_AVAILABLE})
1404 else:
1405 return_dict.update({'status': constants.STATUS_ERROR})
1407 return return_dict
1409 @ensure_share_server_not_provided
1410 def migration_check_compatibility(
1411 self, context, source_share, destination_share,
1412 share_server=None, destination_share_server=None):
1413 """Is called to test compatibility with destination backend."""
1414 backend_name = share_utils.extract_host(
1415 destination_share['host'], level='backend_name')
1416 config = get_backend_configuration(backend_name)
1417 compatible = self.configuration.share_driver == config.share_driver
1418 return {
1419 'compatible': compatible,
1420 'writable': False,
1421 'preserve_metadata': True,
1422 'nondisruptive': True,
1423 }
1425 @ensure_share_server_not_provided
1426 def migration_start(
1427 self, context, source_share, destination_share, source_snapshots,
1428 snapshot_mappings, share_server=None,
1429 destination_share_server=None):
1430 """Is called to start share migration."""
1432 src_dataset_name = self.private_storage.get(
1433 source_share['id'], 'dataset_name')
1434 dst_dataset_name = self._get_dataset_name(destination_share)
1435 backend_name = share_utils.extract_host(
1436 destination_share['host'], level='backend_name')
1437 ssh_cmd = '%(username)s@%(host)s' % {
1438 'username': self.configuration.zfs_ssh_username,
1439 'host': self.configuration.zfs_service_ip,
1440 }
1441 config = get_backend_configuration(backend_name)
1442 remote_ssh_cmd = '%(username)s@%(host)s' % {
1443 'username': config.zfs_ssh_username,
1444 'host': config.zfs_service_ip,
1445 }
1446 snapshot_tag = self._get_migration_snapshot_tag(destination_share)
1447 src_snapshot_name = (
1448 '%(dataset_name)s@%(snapshot_tag)s' % {
1449 'snapshot_tag': snapshot_tag,
1450 'dataset_name': src_dataset_name,
1451 }
1452 )
1454 # Save valuable data to DB
1455 self.private_storage.update(source_share['id'], {
1456 'migr_snapshot_tag': snapshot_tag,
1457 })
1458 self.private_storage.update(destination_share['id'], {
1459 'entity_type': 'share',
1460 'dataset_name': dst_dataset_name,
1461 'ssh_cmd': remote_ssh_cmd,
1462 'pool_name': share_utils.extract_host(
1463 destination_share['host'], level='pool'),
1464 'migr_snapshot_tag': snapshot_tag,
1465 })
1467 # Create temporary snapshot on src host.
1468 self.execute('sudo', 'zfs', 'snapshot', src_snapshot_name)
1470 # Send/receive temporary snapshot
1471 cmd = (
1472 'ssh ' + ssh_cmd + ' '
1473 'sudo zfs send -vDR ' + src_snapshot_name + ' '
1474 '| ssh ' + remote_ssh_cmd + ' '
1475 'sudo zfs receive -v ' + dst_dataset_name
1476 )
1477 filename = dst_dataset_name.replace('/', '_')
1478 with utils.tempdir() as tmpdir:
1479 tmpfilename = os.path.join(tmpdir, '%s.sh' % filename)
1480 with open(tmpfilename, "w") as migr_script:
1481 migr_script.write(cmd)
1482 self.execute('sudo', 'chmod', '755', tmpfilename)
1483 self.execute('nohup', tmpfilename, '&')
1485 @ensure_share_server_not_provided
1486 def migration_continue(
1487 self, context, source_share, destination_share, source_snapshots,
1488 snapshot_mappings, share_server=None,
1489 destination_share_server=None):
1490 """Is called in source share's backend to continue migration."""
1492 snapshot_tag = self.private_storage.get(
1493 destination_share['id'], 'migr_snapshot_tag')
1495 out, err = self.execute('ps', 'aux')
1496 if not '@%s' % snapshot_tag in out:
1497 dst_dataset_name = self.private_storage.get(
1498 destination_share['id'], 'dataset_name')
1499 try:
1500 self.execute(
1501 'sudo', 'zfs', 'get', 'quota', dst_dataset_name,
1502 executor=self._get_shell_executor_by_host(
1503 destination_share['host']),
1504 )
1505 return True
1506 except exception.ProcessExecutionError as e:
1507 raise exception.ZFSonLinuxException(msg=_(
1508 'Migration process is absent and dst dataset '
1509 'returned following error: %s') % e)
1511 @ensure_share_server_not_provided
1512 def migration_complete(
1513 self, context, source_share, destination_share, source_snapshots,
1514 snapshot_mappings, share_server=None,
1515 destination_share_server=None):
1516 """Is called to perform 2nd phase of driver migration of a given share.
1518 """
1519 dst_dataset_name = self.private_storage.get(
1520 destination_share['id'], 'dataset_name')
1521 snapshot_tag = self.private_storage.get(
1522 destination_share['id'], 'migr_snapshot_tag')
1523 dst_snapshot_name = (
1524 '%(dataset_name)s@%(snapshot_tag)s' % {
1525 'snapshot_tag': snapshot_tag,
1526 'dataset_name': dst_dataset_name,
1527 }
1528 )
1530 dst_executor = self._get_shell_executor_by_host(
1531 destination_share['host'])
1533 # Destroy temporary migration snapshot on dst host
1534 self.execute(
1535 'sudo', 'zfs', 'destroy', dst_snapshot_name,
1536 executor=dst_executor,
1537 )
1539 # Get export locations of new share instance
1540 export_locations = self._get_share_helper(
1541 destination_share['share_proto']).create_exports(
1542 dst_dataset_name,
1543 executor=dst_executor)
1545 # Destroy src share and temporary migration snapshot on src (this) host
1546 self.delete_share(context, source_share)
1548 return {'export_locations': export_locations}
1550 @ensure_share_server_not_provided
1551 def migration_cancel(
1552 self, context, source_share, destination_share, source_snapshots,
1553 snapshot_mappings, share_server=None,
1554 destination_share_server=None):
1555 """Is called to cancel driver migration."""
1557 src_dataset_name = self.private_storage.get(
1558 source_share['id'], 'dataset_name')
1559 dst_dataset_name = self.private_storage.get(
1560 destination_share['id'], 'dataset_name')
1561 ssh_cmd = self.private_storage.get(
1562 destination_share['id'], 'ssh_cmd')
1563 snapshot_tag = self.private_storage.get(
1564 destination_share['id'], 'migr_snapshot_tag')
1566 # Kill migration process if exists
1567 try:
1568 out, err = self.execute('ps', 'aux')
1569 lines = out.split('\n')
1570 for line in lines:
1571 if '@%s' % snapshot_tag in line:
1572 migr_pid = [
1573 x for x in line.strip().split(' ') if x != ''][1]
1574 self.execute('sudo', 'kill', '-9', migr_pid)
1575 except exception.ProcessExecutionError as e:
1576 LOG.warning(
1577 "Caught following error trying to kill migration process: %s",
1578 e)
1580 # Sleep couple of seconds before destroying updated objects
1581 time.sleep(2)
1583 # Destroy snapshot on source host
1584 self._delete_dataset_or_snapshot_with_retry(
1585 src_dataset_name + '@' + snapshot_tag)
1587 # Destroy dataset and its migration snapshot on destination host
1588 try:
1589 self.execute(
1590 'ssh', ssh_cmd,
1591 'sudo', 'zfs', 'destroy', '-r', dst_dataset_name,
1592 )
1593 except exception.ProcessExecutionError as e:
1594 LOG.warning(
1595 "Failed to destroy destination dataset with following error: "
1596 "%s",
1597 e)
1599 LOG.debug(
1600 "Migration of share with ID '%s' has been canceled.",
1601 source_share["id"])