Coverage for manila/data/manager.py: 77%
376 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright 2015, Hitachi Data Systems.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
15"""
16Data Service
17"""
19import os
20import shutil
22from oslo_config import cfg
23from oslo_config import types
24from oslo_log import log
25from oslo_service import periodic_task
26from oslo_utils import excutils
27from oslo_utils import importutils
29from manila.common import constants
30from manila import context
31from manila.data import helper
32from manila.data import utils as data_utils
33from manila import exception
34from manila import manager
35from manila import quota
36from manila.share import rpcapi as share_rpc
37from manila import utils
39QUOTAS = quota.QUOTAS
41from manila.i18n import _
43LOG = log.getLogger(__name__)
45backup_opts = [
46 cfg.StrOpt(
47 'backup_driver',
48 default='manila.data.drivers.nfs.NFSBackupDriver',
49 help='Driver to use for backups.'),
50 cfg.StrOpt(
51 'backup_share_mount_template',
52 default='mount -vt %(proto)s %(options)s %(export)s %(path)s',
53 help="The template for mounting shares during backup. Must specify "
54 "the executable with all necessary parameters for the protocol "
55 "supported. 'proto' template element may not be required if "
56 "included in the command. 'export' and 'path' template elements "
57 "are required. It is advisable to separate different commands "
58 "per backend."),
59 cfg.StrOpt(
60 'backup_share_unmount_template',
61 default='umount -v %(path)s',
62 help="The template for unmounting shares during backup. Must "
63 "specify the executable with all necessary parameters for the "
64 "protocol supported. 'path' template element is required. It is "
65 "advisable to separate different commands per backend."),
66 cfg.ListOpt(
67 'backup_ignore_files',
68 default=['lost+found'],
69 help="List of files and folders to be ignored when backing up "
70 "shares. Items should be names (not including any path)."),
71 cfg.Opt(
72 'backup_protocol_access_mapping',
73 type=types.Dict(types.List(types.String(), bounds=True)),
74 default={'ip': ['nfs']},
75 help="Protocol access mapping for backup. Should be a "
76 "dictionary comprised of "
77 "{'access_type1': ['share_proto1', 'share_proto2'],"
78 " 'access_type2': ['share_proto2', 'share_proto3']}."),
79]
81data_opts = [
82 cfg.StrOpt(
83 'mount_tmp_location',
84 default='/tmp/',
85 help="Temporary path to create and mount shares during migration."),
86 cfg.StrOpt(
87 'backup_mount_tmp_location',
88 default='/tmp/',
89 help="Temporary path to create and mount backup during share backup."),
90 cfg.BoolOpt(
91 'check_hash',
92 default=False,
93 help="Chooses whether hash of each file should be checked on data "
94 "copying."),
95 cfg.IntOpt(
96 'backup_continue_update_interval',
97 default=10,
98 help='This value, specified in seconds, determines how often '
99 'the data manager will poll to perform the next steps of '
100 'backup such as fetch the progress of backup.'),
101 cfg.IntOpt(
102 'restore_continue_update_interval',
103 default=10,
104 help='This value, specified in seconds, determines how often '
105 'the data manager will poll to perform the next steps of '
106 'restore such as fetch the progress of restore.')
107]
110CONF = cfg.CONF
111CONF.register_opts(data_opts)
112CONF.register_opts(backup_opts)
115class DataManager(manager.Manager):
116 """Receives requests to handle data and sends responses."""
118 RPC_API_VERSION = '1.1'
120 def __init__(self, service_name=None, *args, **kwargs):
121 super(DataManager, self).__init__(*args, **kwargs)
122 self.backup_driver = importutils.import_object(CONF.backup_driver)
123 self.busy_tasks_shares = {}
124 self.service_id = None
126 def init_host(self, service_id=None):
127 self.service_id = service_id
128 ctxt = context.get_admin_context()
129 shares = self.db.share_get_all(ctxt)
130 for share in shares:
131 if share['task_state'] in constants.BUSY_COPYING_STATES: 131 ↛ 130line 131 didn't jump to line 130 because the condition on line 131 was always true
132 self.db.share_update(
133 ctxt, share['id'],
134 {'task_state': constants.TASK_STATE_DATA_COPYING_ERROR})
136 def migration_start(self, context, ignore_list, share_id,
137 share_instance_id, dest_share_instance_id,
138 connection_info_src, connection_info_dest):
140 LOG.debug(
141 "Received request to migrate share content from share instance "
142 "%(instance_id)s to instance %(dest_instance_id)s.",
143 {'instance_id': share_instance_id,
144 'dest_instance_id': dest_share_instance_id})
146 share_ref = self.db.share_get(context, share_id)
147 share_instance_ref = self.db.share_instance_get(
148 context, share_instance_id, with_share_data=True)
150 share_rpcapi = share_rpc.ShareAPI()
152 mount_path = CONF.mount_tmp_location
154 try:
155 copy = data_utils.Copy(
156 os.path.join(mount_path, share_instance_id),
157 os.path.join(mount_path, dest_share_instance_id),
158 ignore_list, CONF.check_hash)
160 info_src = {
161 'share_id': share_ref['id'],
162 'share_instance_id': share_instance_id,
163 'mount': connection_info_src['mount'],
164 'unmount': connection_info_src['unmount'],
165 'access_mapping': connection_info_src.get(
166 'access_mapping', {}),
167 'mount_point': os.path.join(mount_path,
168 share_instance_id),
169 }
171 info_dest = {
172 'share_id': None,
173 'share_instance_id': dest_share_instance_id,
174 'mount': connection_info_dest['mount'],
175 'unmount': connection_info_dest['unmount'],
176 'access_mapping': connection_info_dest.get(
177 'access_mapping', {}),
178 'mount_point': os.path.join(mount_path,
179 dest_share_instance_id),
180 }
182 self._copy_share_data(context, copy, info_src, info_dest)
183 except exception.ShareDataCopyCancelled:
184 share_rpcapi.migration_complete(
185 context, share_instance_ref, dest_share_instance_id)
186 return
187 except Exception:
188 self.db.share_update(
189 context, share_id,
190 {'task_state': constants.TASK_STATE_DATA_COPYING_ERROR})
191 msg = _("Failed to copy contents from instance %(src)s to "
192 "instance %(dest)s.") % {'src': share_instance_id,
193 'dest': dest_share_instance_id}
194 LOG.exception(msg)
195 share_rpcapi.migration_complete(
196 context, share_instance_ref, dest_share_instance_id)
197 raise exception.ShareDataCopyFailed(reason=msg)
198 finally:
199 self.busy_tasks_shares.pop(share_id, None)
201 LOG.info(
202 "Completed copy operation of migrating share content from share "
203 "instance %(instance_id)s to instance %(dest_instance_id)s.",
204 {'instance_id': share_instance_id,
205 'dest_instance_id': dest_share_instance_id})
207 def data_copy_cancel(self, context, share_id):
208 LOG.debug("Received request to cancel data copy "
209 "of share %s.", share_id)
210 copy = self.busy_tasks_shares.get(share_id)
211 if copy:
212 copy.cancel()
213 else:
214 msg = _("Data copy for migration of share %s cannot be cancelled"
215 " at this moment.") % share_id
216 LOG.error(msg)
217 raise exception.InvalidShare(reason=msg)
219 def data_copy_get_progress(self, context, share_id):
220 LOG.debug("Received request to get data copy information "
221 "of share %s.", share_id)
222 copy = self.busy_tasks_shares.get(share_id)
223 if copy:
224 result = copy.get_progress()
225 LOG.info("Obtained following data copy information "
226 "of share %(share)s: %(info)s.",
227 {'share': share_id,
228 'info': result})
229 return result
230 else:
231 msg = _("Migration of share %s data copy progress cannot be "
232 "obtained at this moment.") % share_id
233 LOG.error(msg)
234 raise exception.InvalidShare(reason=msg)
236 def _copy_share_data(self, context, copy, info_src, info_dest):
237 """Copy share data between source and destination.
239 e.g. During migration source and destination both are shares
240 and during backup create, destination is backup location
241 while during backup restore, source is backup location.
242 1. Mount source and destination. Create access rules.
243 2. Perform copy
244 3. Unmount source and destination. Cleanup access rules.
245 """
246 mount_path = CONF.mount_tmp_location
248 if info_src.get('share_id'):
249 share_id = info_src['share_id']
250 elif info_dest.get('share_id'): 250 ↛ 253line 250 didn't jump to line 253 because the condition on line 250 was always true
251 share_id = info_dest['share_id']
252 else:
253 msg = _("Share data copy failed because of undefined share.")
254 LOG.exception(msg)
255 raise exception.ShareDataCopyFailed(reason=msg)
257 share_instance_src = None
258 share_instance_dest = None
259 if info_src['share_instance_id']:
260 share_instance_src = self.db.share_instance_get(
261 context, info_src['share_instance_id'], with_share_data=True)
262 if info_dest['share_instance_id']:
263 share_instance_dest = self.db.share_instance_get(
264 context, info_dest['share_instance_id'], with_share_data=True)
266 share = self.db.share_get(context, share_id)
267 self.db.share_update(
268 context, share['id'],
269 {'task_state': constants.TASK_STATE_DATA_COPYING_STARTING})
271 helper_src = helper.DataServiceHelper(context, self.db, share)
272 helper_dest = helper_src
274 if share_instance_src:
275 access_ref_src = helper_src.allow_access_to_data_service(
276 share_instance_src, info_src, share_instance_dest, info_dest)
277 access_ref_dest = access_ref_src
278 elif share_instance_dest: 278 ↛ 283line 278 didn't jump to line 283 because the condition on line 278 was always true
279 access_ref_src = helper_src.allow_access_to_data_service(
280 share_instance_dest, info_dest, share_instance_src, info_src)
281 access_ref_dest = access_ref_src
283 def _call_cleanups(items):
284 for item in items:
285 if 'unmount_src' == item:
286 helper_src.cleanup_unmount_temp_folder(
287 info_src, mount_path)
288 elif 'temp_folder_src' == item:
289 helper_src.cleanup_temp_folder(
290 mount_path, info_src['share_instance_id'])
291 elif 'temp_folder_dest' == item:
292 helper_dest.cleanup_temp_folder(
293 mount_path, info_dest['share_instance_id'])
294 elif 'access_src' == item and share_instance_src:
295 helper_src.cleanup_data_access(
296 access_ref_src, share_instance_src)
297 elif 'access_dest' == item and share_instance_dest: 297 ↛ 284line 297 didn't jump to line 284 because the condition on line 297 was always true
298 helper_dest.cleanup_data_access(
299 access_ref_dest, share_instance_dest)
300 try:
301 helper_src.mount_share_instance_or_backup(info_src, mount_path)
302 except Exception:
303 msg = _("Share data copy failed attempting to mount source "
304 "at %s.") % info_src['mount_point']
305 LOG.exception(msg)
306 _call_cleanups(['temp_folder_src', 'access_dest', 'access_src'])
307 raise exception.ShareDataCopyFailed(reason=msg)
309 try:
310 helper_dest.mount_share_instance_or_backup(info_dest, mount_path)
311 except Exception:
312 msg = _("Share data copy failed attempting to mount destination "
313 "at %s.") % info_dest['mount_point']
314 LOG.exception(msg)
315 _call_cleanups(['temp_folder_dest', 'unmount_src',
316 'temp_folder_src', 'access_dest', 'access_src'])
317 raise exception.ShareDataCopyFailed(reason=msg)
319 self.busy_tasks_shares[share['id']] = copy
320 self.db.share_update(
321 context, share['id'],
322 {'task_state': constants.TASK_STATE_DATA_COPYING_IN_PROGRESS})
324 copied = False
325 try:
326 copy.run()
327 self.db.share_update(
328 context, share['id'],
329 {'task_state': constants.TASK_STATE_DATA_COPYING_COMPLETING})
330 if copy.get_progress()['total_progress'] == 100: 330 ↛ 338line 330 didn't jump to line 338 because the condition on line 330 was always true
331 copied = True
332 except Exception:
333 LOG.exception("Failed to copy data from source to destination "
334 "%(src)s to %(dest)s.",
335 {'src': info_src['mount_point'],
336 'dest': info_dest['mount_point']})
338 try:
339 helper_src.unmount_share_instance_or_backup(info_src,
340 mount_path)
341 except Exception:
342 LOG.exception("Could not unmount src %s after its data copy.",
343 info_src['mount_point'])
345 try:
346 helper_dest.unmount_share_instance_or_backup(info_dest,
347 mount_path)
348 except Exception:
349 LOG.exception("Could not unmount dest %s after its data copy.",
350 info_dest['mount_point'])
352 try:
353 if info_src['share_instance_id']:
354 helper_src.deny_access_to_data_service(access_ref_src,
355 share_instance_src)
356 except Exception:
357 LOG.exception("Could not deny access to src instance %s after "
358 "its data copy.", info_src['share_instance_id'])
360 try:
361 if info_dest['share_instance_id']:
362 helper_dest.deny_access_to_data_service(access_ref_dest,
363 share_instance_dest)
364 except Exception:
365 LOG.exception("Could not deny access to dest instance %s after "
366 "its data copy.", info_dest['share_instance_id'])
368 if copy and copy.cancelled:
369 self.db.share_update(
370 context, share['id'],
371 {'task_state': constants.TASK_STATE_DATA_COPYING_CANCELLED})
372 LOG.warning("Copy of data from source "
373 "%(src)s to destination %(dest)s was cancelled.",
374 {'src': info_src['mount_point'],
375 'dest': info_dest['mount_point']})
376 raise exception.ShareDataCopyCancelled()
377 elif not copied:
378 msg = _("Copying data from source %(src)s "
379 "to destination %(dest)s did not succeed.") % (
380 {'src': info_src['mount_point'],
381 'dest': info_dest['mount_point']})
382 raise exception.ShareDataCopyFailed(reason=msg)
384 self.db.share_update(
385 context, share['id'],
386 {'task_state': constants.TASK_STATE_DATA_COPYING_COMPLETED})
388 LOG.debug("Copy of data from source %(src)s to destination "
389 "%(dest)s was successful.", {
390 'src': info_src['mount_point'],
391 'dest': info_dest['mount_point']})
393 def create_backup(self, context, backup):
394 share_id = backup['share_id']
395 backup_id = backup['id']
396 share = self.db.share_get(context, share_id)
397 backup = self.db.share_backup_get(context, backup_id)
399 self.db.share_backup_update(context, backup_id, {'host': self.host})
401 LOG.info('Create backup started, backup: %(backup_id)s '
402 'share: %(share_id)s.',
403 {'backup_id': backup_id, 'share_id': share_id})
405 try:
406 if self.backup_driver.use_data_manager is False:
407 self.backup_driver.backup(context, backup, share)
408 else:
409 self._run_backup(context, backup, share)
410 except Exception as err:
411 with excutils.save_and_reraise_exception():
412 LOG.error("Failed to create share backup %s by data driver.",
413 backup['id'])
414 self.db.share_update(
415 context, share_id,
416 {'status': constants.STATUS_AVAILABLE})
417 self.db.share_backup_update(
418 context, backup_id,
419 {'status': constants.STATUS_ERROR, 'fail_reason': err})
420 self.db.share_update(
421 context, share_id, {'status': constants.STATUS_AVAILABLE})
422 self.db.share_backup_update(
423 context, backup_id,
424 {'status': constants.STATUS_AVAILABLE, 'progress': '100'})
425 LOG.info("Created share backup %s successfully.", backup_id)
427 @periodic_task.periodic_task(
428 spacing=CONF.backup_continue_update_interval)
429 def create_backup_continue(self, context):
430 filters = {
431 'status': constants.STATUS_CREATING,
432 'host': self.host,
433 'topic': CONF.data_topic
434 }
435 backups = self.db.share_backups_get_all(context, filters)
437 for backup in backups:
438 backup_id = backup['id']
439 share_id = backup['share_id']
440 share = self.db.share_get(context, share_id)
441 result = {}
442 try:
443 if self.backup_driver.use_data_manager is False:
444 progress = self.backup_driver.get_backup_progress(
445 context, backup, share)
446 else:
447 result = self.data_copy_get_progress(context, share_id)
448 progress = result.get('total_progress', '0')
449 backup_values = {'progress': progress}
450 if progress == '100':
451 self.db.share_update(
452 context, share_id,
453 {'status': constants.STATUS_AVAILABLE})
454 backup_values.update(
455 {'status': constants.STATUS_AVAILABLE})
456 LOG.info("Created share backup %s successfully.",
457 backup_id)
458 self.db.share_backup_update(
459 context, backup_id, backup_values)
460 except Exception:
461 LOG.warning("Failed to get progress of share %(share)s "
462 "backing up in share_backup %(backup).",
463 {'share': share_id, 'backup': backup_id})
464 self.db.share_update(
465 context, share_id,
466 {'status': constants.STATUS_AVAILABLE})
467 self.db.share_backup_update(
468 context, backup_id,
469 {'status': constants.STATUS_ERROR, 'progress': '0'})
471 def _get_share_mount_info(self, share_instance):
472 mount_template = CONF.backup_share_mount_template
474 path = next((x['path'] for x in share_instance['export_locations']
475 if x['is_admin_only']), None)
476 if not path:
477 path = share_instance['export_locations'][0]['path']
479 format_args = {
480 'proto': share_instance['share_proto'].lower(),
481 'export': path,
482 'path': '%(path)s',
483 'options': '%(options)s',
484 }
486 unmount_template = CONF.backup_share_unmount_template
487 mount_info = {
488 'mount': mount_template % format_args,
489 'unmount': unmount_template,
490 }
491 return mount_info
493 def _get_backup_access_mapping(self, share):
494 mapping = CONF.backup_protocol_access_mapping
495 result = {}
496 share_proto = share['share_proto'].lower()
497 for access_type, protocols in mapping.items():
498 if share_proto in [y.lower() for y in protocols]:
499 result[access_type] = result.get(access_type, [])
500 result[access_type].append(share_proto)
501 return result
503 def _run_backup(self, context, backup, share):
504 share_instance_id = share.instance.get('id')
505 share_instance = self.db.share_instance_get(
506 context, share_instance_id, with_share_data=True)
508 access_mapping = self._get_backup_access_mapping(share)
509 ignore_list = CONF.backup_ignore_files
510 mount_path = CONF.mount_tmp_location
511 backup_mount_path = CONF.backup_mount_tmp_location
513 mount_info = self._get_share_mount_info(share_instance)
514 dest_backup_info = self.backup_driver.get_backup_info(backup)
516 dest_backup_mount_point = os.path.join(backup_mount_path, backup['id'])
517 backup_folder = os.path.join(dest_backup_mount_point, backup['id'])
519 try:
520 copy = data_utils.Copy(
521 os.path.join(mount_path, share_instance_id),
522 backup_folder,
523 ignore_list)
525 info_src = {
526 'share_id': share['id'],
527 'share_instance_id': share_instance_id,
528 'mount': mount_info['mount'],
529 'unmount': mount_info['unmount'],
530 'mount_point': os.path.join(mount_path, share_instance_id),
531 'access_mapping': access_mapping
532 }
534 info_dest = {
535 'share_id': None,
536 'share_instance_id': None,
537 'backup': True,
538 'backup_id': backup['id'],
539 'mount': dest_backup_info['mount'],
540 'unmount': dest_backup_info['unmount'],
541 'mount_point': dest_backup_mount_point,
542 'access_mapping': access_mapping
543 }
544 self._copy_share_data(context, copy, info_src, info_dest)
545 self.db.share_update(context, share['id'], {'task_state': None})
546 except Exception:
547 self.db.share_update(
548 context, share['id'],
549 {'task_state': constants.TASK_STATE_DATA_COPYING_ERROR})
550 msg = _("Failed to copy contents from share %(src)s to "
551 "backup %(dest)s.") % (
552 {'src': share_instance_id, 'dest': backup['id']})
553 LOG.exception(msg)
554 raise exception.ShareDataCopyFailed(reason=msg)
555 finally:
556 self.busy_tasks_shares.pop(share['id'], None)
558 def delete_backup(self, context, backup):
559 backup_id = backup['id']
560 LOG.info('Delete backup started, backup: %s.', backup_id)
562 backup = self.db.share_backup_get(context, backup_id)
563 try:
564 if self.backup_driver.use_data_manager is False:
565 self.backup_driver.delete(context, backup)
566 else:
567 dest_backup_info = self.backup_driver.get_backup_info(backup)
568 backup_mount_path = CONF.backup_mount_tmp_location
569 mount_point = os.path.join(backup_mount_path, backup['id'])
570 backup_folder = os.path.join(mount_point, backup['id'])
571 if not os.path.exists(backup_folder): 571 ↛ 573line 571 didn't jump to line 573 because the condition on line 571 was always true
572 os.makedirs(backup_folder)
573 if not os.path.exists(backup_folder): 573 ↛ 574line 573 didn't jump to line 574 because the condition on line 573 was never true
574 raise exception.NotFound("Path %s could not be "
575 "found." % backup_folder)
577 mount_template = dest_backup_info['mount']
578 unmount_template = dest_backup_info['unmount']
579 mount_command = mount_template % {'path': mount_point}
580 unmount_command = unmount_template % {'path': mount_point}
581 utils.execute(*(mount_command.split()), run_as_root=True)
583 # backup_folder should exist after mount, else backup is
584 # already deleted
585 if os.path.exists(backup_folder): 585 ↛ 601line 585 didn't jump to line 601 because the condition on line 585 was always true
586 for filename in os.listdir(backup_folder): 586 ↛ 587line 586 didn't jump to line 587 because the loop on line 586 never started
587 if filename in CONF.backup_ignore_files:
588 continue
589 file_path = os.path.join(backup_folder, filename)
590 try:
591 if (os.path.isfile(file_path) or
592 os.path.islink(file_path)):
593 os.unlink(file_path)
594 elif os.path.isdir(file_path):
595 shutil.rmtree(file_path)
596 except Exception as e:
597 LOG.debug("Failed to delete %(file_path)s. Reason:"
598 " %(err)s", {'file_path': file_path,
599 'err': e})
600 shutil.rmtree(backup_folder)
601 utils.execute(*(unmount_command.split()), run_as_root=True)
602 except Exception:
603 with excutils.save_and_reraise_exception():
604 LOG.error("Failed to delete share backup %s.", backup['id'])
605 self.db.share_backup_update(
606 context, backup['id'],
607 {'status': constants.STATUS_ERROR_DELETING})
609 try:
610 reserve_opts = {
611 'backups': -1,
612 'backup_gigabytes': -backup['size'],
613 }
614 reservations = QUOTAS.reserve(
615 context, project_id=backup['project_id'], **reserve_opts)
616 except Exception as e:
617 reservations = None
618 LOG.warning("Failed to update backup quota for %(pid)s: %(err)s.",
619 {'pid': backup['project_id'], 'err': e})
620 raise
622 if reservations: 622 ↛ 626line 622 didn't jump to line 626 because the condition on line 622 was always true
623 QUOTAS.commit(context, reservations,
624 project_id=backup['project_id'])
626 self.db.share_backup_delete(context, backup_id)
627 LOG.info("Share backup %s deleted successfully.", backup_id)
629 def restore_backup(self, context, backup, share_id):
630 backup_id = backup['id']
631 LOG.info('Restore backup started, backup: %(backup_id)s '
632 'share: %(share_id)s.',
633 {'backup_id': backup['id'], 'share_id': share_id})
635 share = self.db.share_get(context, share_id)
636 backup = self.db.share_backup_get(context, backup_id)
638 try:
639 if (self.backup_driver.restore_to_target_support is False and
640 share['id'] != backup['share_id']):
641 msg = _("Cannot restore backup %(backup)s to target share "
642 "%(share)s as backup driver does not provide support "
643 " for targeted restores") % (
644 {'backup': backup['id'], 'share': share['id']}
645 )
646 LOG.exception(msg)
647 raise exception.BackupException(reason=msg)
649 if self.backup_driver.use_data_manager is False:
650 self.backup_driver.restore(context, backup, share)
651 else:
652 self._run_restore(context, backup, share)
653 except Exception:
654 with excutils.save_and_reraise_exception():
655 LOG.error("Failed to restore backup %(backup)s to share "
656 "%(share)s by data driver.",
657 {'backup': backup['id'], 'share': share_id})
658 self.db.share_update(
659 context, share_id,
660 {'status': constants.STATUS_BACKUP_RESTORING_ERROR})
661 self.db.share_backup_update(
662 context, backup_id,
663 {'status': constants.STATUS_AVAILABLE})
664 self.db.share_update(
665 context, share_id, {'status': constants.STATUS_AVAILABLE})
666 self.db.share_backup_update(
667 context, backup_id,
668 {'status': constants.STATUS_AVAILABLE, 'restore_progress': '100'})
669 LOG.info("Share backup %s restored successfully.", backup_id)
671 @periodic_task.periodic_task(
672 spacing=CONF.restore_continue_update_interval)
673 def restore_backup_continue(self, context):
674 filters = {
675 'status': constants.STATUS_RESTORING,
676 'host': self.host,
677 'topic': CONF.data_topic
678 }
679 backups = self.db.share_backups_get_all(context, filters)
680 for backup in backups:
681 backup_id = backup['id']
682 try:
683 filters = {
684 'source_backup_id': backup_id,
685 }
686 shares = self.db.share_get_all(context, filters)
687 except Exception:
688 LOG.warning('Failed to get shares for backup %s', backup_id)
689 continue
691 for share in shares:
692 if share['status'] != constants.STATUS_BACKUP_RESTORING: 692 ↛ 693line 692 didn't jump to line 693 because the condition on line 692 was never true
693 continue
695 share_id = share['id']
696 result = {}
697 try:
698 if self.backup_driver.use_data_manager is False:
699 progress = self.backup_driver.get_restore_progress(
700 context, backup, share)
701 else:
702 result = self.data_copy_get_progress(context, share_id)
703 progress = result.get('total_progress', '0')
704 backup_values = {'restore_progress': progress}
705 if progress == '100':
706 self.db.share_update(
707 context, share_id,
708 {'status': constants.STATUS_AVAILABLE})
709 backup_values.update(
710 {'status': constants.STATUS_AVAILABLE})
711 LOG.info("Share backup %s restored successfully.",
712 backup_id)
713 self.db.share_backup_update(context, backup_id,
714 backup_values)
715 except Exception:
716 LOG.exception("Failed to get progress of share_backup "
717 "%(backup)s restoring in share %(share).",
718 {'share': share_id, 'backup': backup_id})
719 self.db.share_update(
720 context, share_id,
721 {'status': constants.STATUS_BACKUP_RESTORING_ERROR})
722 self.db.share_backup_update(
723 context, backup_id,
724 {'status': constants.STATUS_AVAILABLE,
725 'restore_progress': '0'})
727 def _run_restore(self, context, backup, share):
728 share_instance_id = share.instance.get('id')
729 share_instance = self.db.share_instance_get(
730 context, share_instance_id, with_share_data=True)
732 access_mapping = self._get_backup_access_mapping(share)
733 mount_path = CONF.mount_tmp_location
734 backup_mount_path = CONF.backup_mount_tmp_location
735 ignore_list = CONF.backup_ignore_files
737 mount_info = self._get_share_mount_info(share_instance)
738 src_backup_info = self.backup_driver.get_backup_info(backup)
740 src_backup_mount_point = os.path.join(backup_mount_path, backup['id'])
741 backup_folder = os.path.join(src_backup_mount_point, backup['id'])
743 try:
744 copy = data_utils.Copy(
745 backup_folder,
746 os.path.join(mount_path, share_instance_id),
747 ignore_list)
749 info_src = {
750 'share_id': None,
751 'share_instance_id': None,
752 'restore': True,
753 'backup_id': backup['id'],
754 'mount': src_backup_info['mount'],
755 'unmount': src_backup_info['unmount'],
756 'mount_point': src_backup_mount_point,
757 'access_mapping': access_mapping
758 }
760 info_dest = {
761 'share_id': share['id'],
762 'share_instance_id': share_instance_id,
763 'mount': mount_info['mount'],
764 'unmount': mount_info['unmount'],
765 'mount_point': os.path.join(mount_path, share_instance_id),
766 'access_mapping': access_mapping
767 }
769 self._copy_share_data(context, copy, info_src, info_dest)
770 self.db.share_update(context, share['id'], {'task_state': None})
771 except Exception:
772 self.db.share_update(
773 context, share['id'],
774 {'task_state': constants.TASK_STATE_DATA_COPYING_ERROR})
775 msg = _("Failed to copy/restore contents from backup %(src)s "
776 "to share %(dest)s.") % (
777 {'src': backup['id'], 'dest': share_instance_id})
778 LOG.exception(msg)
779 raise exception.ShareDataCopyFailed(reason=msg)
780 finally:
781 self.busy_tasks_shares.pop(share['id'], None)