Coverage for manila/share/drivers/windows/windows_utils.py: 98%
113 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Copyright (c) 2015 Cloudbase Solutions SRL
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
16import re
18from oslo_log import log
21LOG = log.getLogger(__name__)
24class WindowsUtils(object):
25 def __init__(self, remote_execute):
26 self._remote_exec = remote_execute
27 self._fsutil_total_space_regex = re.compile('of bytes *: ([0-9]*)')
28 self._fsutil_free_space_regex = re.compile(
29 'of avail free bytes *: ([0-9]*)')
31 def initialize_disk(self, server, disk_number):
32 cmd = ["Initialize-Disk", "-Number", disk_number]
33 self._remote_exec(server, cmd)
35 def create_partition(self, server, disk_number):
36 cmd = ["New-Partition", "-DiskNumber", disk_number, "-UseMaximumSize"]
37 self._remote_exec(server, cmd)
39 def format_partition(self, server, disk_number, partition_number):
40 cmd = ("Get-Partition -DiskNumber %(disk_number)s "
41 "-PartitionNumber %(partition_number)s | "
42 "Format-Volume -FileSystem NTFS -Force -Confirm:$false" % {
43 'disk_number': disk_number,
44 'partition_number': partition_number,
45 })
46 self._remote_exec(server, cmd)
48 def add_access_path(self, server, mount_path, disk_number,
49 partition_number):
50 cmd = ["Add-PartitionAccessPath", "-DiskNumber", disk_number,
51 "-PartitionNumber", partition_number,
52 "-AccessPath", self.quote_string(mount_path)]
53 self._remote_exec(server, cmd)
55 def resize_partition(self, server, size_bytes, disk_number,
56 partition_number):
57 cmd = ['Resize-Partition', '-DiskNumber', disk_number,
58 '-PartitionNumber', partition_number,
59 '-Size', size_bytes]
60 self._remote_exec(server, cmd)
62 def get_disk_number_by_serial_number(self, server, serial_number):
63 pattern = "%s*" % serial_number[:15]
64 cmd = ("Get-Disk | "
65 "Where-Object {$_.SerialNumber -like '%s'} | "
66 "Select-Object -ExpandProperty Number" % pattern)
67 (out, err) = self._remote_exec(server, cmd)
68 return int(out) if (len(out) > 0) else None
70 def get_disk_number_by_mount_path(self, server, mount_path):
71 cmd = ('Get-Partition | '
72 'Where-Object {$_.AccessPaths -contains "%s"} | '
73 'Select-Object -ExpandProperty DiskNumber' %
74 (mount_path + "\\"))
75 (out, err) = self._remote_exec(server, cmd)
76 return int(out) if (len(out) > 0) else None
78 def get_volume_path_by_mount_path(self, server, mount_path):
79 cmd = ('Get-Partition | '
80 'Where-Object {$_.AccessPaths -contains "%s"} | '
81 'Get-Volume | '
82 'Select-Object -ExpandProperty Path' %
83 (mount_path + "\\"))
84 (out, err) = self._remote_exec(server, cmd)
85 return out.strip()
87 def get_disk_space_by_path(self, server, mount_path):
88 cmd = ["fsutil", "volume", "diskfree",
89 self.quote_string(mount_path)]
90 (out, err) = self._remote_exec(server, cmd)
92 total_bytes = int(self._fsutil_total_space_regex.findall(out)[0])
93 free_bytes = int(self._fsutil_free_space_regex.findall(out)[0])
94 return total_bytes, free_bytes
96 def get_partition_maximum_size(self, server, disk_number,
97 partition_number):
98 cmd = ('Get-PartitionSupportedSize -DiskNumber %(disk_number)s '
99 '-PartitionNumber %(partition_number)s | '
100 'Select-Object -ExpandProperty SizeMax' %
101 dict(disk_number=disk_number,
102 partition_number=partition_number))
103 (out, err) = self._remote_exec(server, cmd)
105 max_bytes = int(out)
106 return max_bytes
108 def set_disk_online_status(self, server, disk_number, online=True):
109 is_offline = int(not online)
110 cmd = ["Set-Disk", "-Number", disk_number, "-IsOffline", is_offline]
111 self._remote_exec(server, cmd)
113 def set_disk_readonly_status(self, server, disk_number, readonly=False):
114 cmd = ["Set-Disk", "-Number", disk_number,
115 "-IsReadOnly", int(readonly)]
116 self._remote_exec(server, cmd)
118 def update_disk(self, server, disk_number):
119 """Updates cached disk information."""
120 cmd = ["Update-Disk", disk_number]
121 self._remote_exec(server, cmd)
123 def join_domain(self, server, domain, admin_username, admin_password):
124 # NOTE(lpetrut): An instance reboot is needed but this will be
125 # performed using Nova so that the instance state can be
126 # retrieved easier.
127 LOG.info("Joining server %(ip)s to Active Directory "
128 "domain %(domain)s", dict(ip=server['ip'],
129 domain=domain))
130 cmds = [
131 ('$password = "%s" | '
132 'ConvertTo-SecureString -asPlainText -Force' % admin_password),
133 ('$credential = '
134 'New-Object System.Management.Automation.PSCredential('
135 '"%s", $password)' % admin_username),
136 ('Add-Computer -DomainName "%s" -Credential $credential' %
137 domain)]
139 cmd = ";".join(cmds)
140 self._remote_exec(server, cmd)
142 def unjoin_domain(self, server, admin_username, admin_password,
143 reboot=False):
144 cmds = [
145 ('$password = "%s" | '
146 'ConvertTo-SecureString -asPlainText -Force' % admin_password),
147 ('$credential = '
148 'New-Object System.Management.Automation.PSCredential('
149 '"%s", $password)' % admin_username),
150 ('Remove-Computer -UnjoinDomaincredential $credential '
151 '-Passthru -Verbose -Force')]
153 cmd = ";".join(cmds)
154 self._remote_exec(server, cmd)
156 def get_current_domain(self, server):
157 cmd = "(Get-WmiObject Win32_ComputerSystem).Domain"
158 (out, err) = self._remote_exec(server, cmd)
159 return out.strip()
161 def ensure_directory_exists(self, server, path):
162 cmd = ["New-Item", "-ItemType", "Directory",
163 "-Force", "-Path", self.quote_string(path)]
164 self._remote_exec(server, cmd)
166 def remove(self, server, path, force=True, recurse=False,
167 is_junction=False):
168 if self.path_exists(server, path):
169 if is_junction:
170 cmd = ('[System.IO.Directory]::Delete('
171 '%(path)s, %(recurse)d)'
172 % dict(path=self.quote_string(path),
173 recurse=recurse))
174 else:
175 cmd = ["Remove-Item", "-Confirm:$false",
176 "-Path", self.quote_string(path)]
177 if force: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true
178 cmd += ['-Force']
179 if recurse: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was always true
180 cmd += ['-Recurse']
181 self._remote_exec(server, cmd)
182 else:
183 LOG.debug("Skipping deleting path %s as it does "
184 "not exist.", path)
186 def path_exists(self, server, path):
187 cmd = ["Test-Path", path]
188 (out, _) = self._remote_exec(server, cmd)
189 return out.strip() == "True"
191 def normalize_path(self, path):
192 return path.replace('/', '\\')
194 def get_interface_index_by_ip(self, server, ip):
195 cmd = ('Get-NetIPAddress | '
196 'Where-Object {$_.IPAddress -eq "%(ip)s"} | '
197 'Select-Object -ExpandProperty InterfaceIndex' %
198 dict(ip=ip))
200 (out, err) = self._remote_exec(server, cmd)
201 if_index = int(out)
202 return if_index
204 def set_dns_client_search_list(self, server, search_list):
205 src_list = ",".join(["'%s'" % domain for domain in search_list])
207 cmd = ["Set-DnsClientGlobalSetting",
208 "-SuffixSearchList", "@(%s)" % src_list]
209 self._remote_exec(server, cmd)
211 def set_dns_client_server_addresses(self, server, if_index, dns_servers):
212 dns_sv_list = ",".join(["'%s'" % dns_sv for dns_sv in dns_servers])
213 cmd = ["Set-DnsClientServerAddress",
214 "-InterfaceIndex", if_index,
215 "-ServerAddresses", "(%s)" % dns_sv_list]
216 self._remote_exec(server, cmd)
218 def set_win_reg_value(self, server, path, key, value):
219 cmd = ['Set-ItemProperty', '-Path', self.quote_string(path),
220 '-Name', key, '-Value', value]
221 self._remote_exec(server, cmd)
223 def get_win_reg_value(self, server, path, name=None):
224 cmd = "Get-ItemProperty -Path %s" % self.quote_string(path)
225 if name:
226 cmd += " | Select-Object -ExpandProperty %s" % name
227 return self._remote_exec(server, cmd, retry=False)[0]
229 def quote_string(self, string):
230 return '"%s"' % string