Coverage for manila/share/drivers/windows/windows_utils.py: 98%

113 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2026-02-18 22:19 +0000

1# Copyright (c) 2015 Cloudbase Solutions SRL 

2# All Rights Reserved. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); you may 

5# not use this file except in compliance with the License. You may obtain 

6# a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 

12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 

13# License for the specific language governing permissions and limitations 

14# under the License. 

15 

16import re 

17 

18from oslo_log import log 

19 

20 

21LOG = log.getLogger(__name__) 

22 

23 

24class WindowsUtils(object): 

25 def __init__(self, remote_execute): 

26 self._remote_exec = remote_execute 

27 self._fsutil_total_space_regex = re.compile('of bytes *: ([0-9]*)') 

28 self._fsutil_free_space_regex = re.compile( 

29 'of avail free bytes *: ([0-9]*)') 

30 

31 def initialize_disk(self, server, disk_number): 

32 cmd = ["Initialize-Disk", "-Number", disk_number] 

33 self._remote_exec(server, cmd) 

34 

35 def create_partition(self, server, disk_number): 

36 cmd = ["New-Partition", "-DiskNumber", disk_number, "-UseMaximumSize"] 

37 self._remote_exec(server, cmd) 

38 

39 def format_partition(self, server, disk_number, partition_number): 

40 cmd = ("Get-Partition -DiskNumber %(disk_number)s " 

41 "-PartitionNumber %(partition_number)s | " 

42 "Format-Volume -FileSystem NTFS -Force -Confirm:$false" % { 

43 'disk_number': disk_number, 

44 'partition_number': partition_number, 

45 }) 

46 self._remote_exec(server, cmd) 

47 

48 def add_access_path(self, server, mount_path, disk_number, 

49 partition_number): 

50 cmd = ["Add-PartitionAccessPath", "-DiskNumber", disk_number, 

51 "-PartitionNumber", partition_number, 

52 "-AccessPath", self.quote_string(mount_path)] 

53 self._remote_exec(server, cmd) 

54 

55 def resize_partition(self, server, size_bytes, disk_number, 

56 partition_number): 

57 cmd = ['Resize-Partition', '-DiskNumber', disk_number, 

58 '-PartitionNumber', partition_number, 

59 '-Size', size_bytes] 

60 self._remote_exec(server, cmd) 

61 

62 def get_disk_number_by_serial_number(self, server, serial_number): 

63 pattern = "%s*" % serial_number[:15] 

64 cmd = ("Get-Disk | " 

65 "Where-Object {$_.SerialNumber -like '%s'} | " 

66 "Select-Object -ExpandProperty Number" % pattern) 

67 (out, err) = self._remote_exec(server, cmd) 

68 return int(out) if (len(out) > 0) else None 

69 

70 def get_disk_number_by_mount_path(self, server, mount_path): 

71 cmd = ('Get-Partition | ' 

72 'Where-Object {$_.AccessPaths -contains "%s"} | ' 

73 'Select-Object -ExpandProperty DiskNumber' % 

74 (mount_path + "\\")) 

75 (out, err) = self._remote_exec(server, cmd) 

76 return int(out) if (len(out) > 0) else None 

77 

78 def get_volume_path_by_mount_path(self, server, mount_path): 

79 cmd = ('Get-Partition | ' 

80 'Where-Object {$_.AccessPaths -contains "%s"} | ' 

81 'Get-Volume | ' 

82 'Select-Object -ExpandProperty Path' % 

83 (mount_path + "\\")) 

84 (out, err) = self._remote_exec(server, cmd) 

85 return out.strip() 

86 

87 def get_disk_space_by_path(self, server, mount_path): 

88 cmd = ["fsutil", "volume", "diskfree", 

89 self.quote_string(mount_path)] 

90 (out, err) = self._remote_exec(server, cmd) 

91 

92 total_bytes = int(self._fsutil_total_space_regex.findall(out)[0]) 

93 free_bytes = int(self._fsutil_free_space_regex.findall(out)[0]) 

94 return total_bytes, free_bytes 

95 

96 def get_partition_maximum_size(self, server, disk_number, 

97 partition_number): 

98 cmd = ('Get-PartitionSupportedSize -DiskNumber %(disk_number)s ' 

99 '-PartitionNumber %(partition_number)s | ' 

100 'Select-Object -ExpandProperty SizeMax' % 

101 dict(disk_number=disk_number, 

102 partition_number=partition_number)) 

103 (out, err) = self._remote_exec(server, cmd) 

104 

105 max_bytes = int(out) 

106 return max_bytes 

107 

108 def set_disk_online_status(self, server, disk_number, online=True): 

109 is_offline = int(not online) 

110 cmd = ["Set-Disk", "-Number", disk_number, "-IsOffline", is_offline] 

111 self._remote_exec(server, cmd) 

112 

113 def set_disk_readonly_status(self, server, disk_number, readonly=False): 

114 cmd = ["Set-Disk", "-Number", disk_number, 

115 "-IsReadOnly", int(readonly)] 

116 self._remote_exec(server, cmd) 

117 

118 def update_disk(self, server, disk_number): 

119 """Updates cached disk information.""" 

120 cmd = ["Update-Disk", disk_number] 

121 self._remote_exec(server, cmd) 

122 

123 def join_domain(self, server, domain, admin_username, admin_password): 

124 # NOTE(lpetrut): An instance reboot is needed but this will be 

125 # performed using Nova so that the instance state can be 

126 # retrieved easier. 

127 LOG.info("Joining server %(ip)s to Active Directory " 

128 "domain %(domain)s", dict(ip=server['ip'], 

129 domain=domain)) 

130 cmds = [ 

131 ('$password = "%s" | ' 

132 'ConvertTo-SecureString -asPlainText -Force' % admin_password), 

133 ('$credential = ' 

134 'New-Object System.Management.Automation.PSCredential(' 

135 '"%s", $password)' % admin_username), 

136 ('Add-Computer -DomainName "%s" -Credential $credential' % 

137 domain)] 

138 

139 cmd = ";".join(cmds) 

140 self._remote_exec(server, cmd) 

141 

142 def unjoin_domain(self, server, admin_username, admin_password, 

143 reboot=False): 

144 cmds = [ 

145 ('$password = "%s" | ' 

146 'ConvertTo-SecureString -asPlainText -Force' % admin_password), 

147 ('$credential = ' 

148 'New-Object System.Management.Automation.PSCredential(' 

149 '"%s", $password)' % admin_username), 

150 ('Remove-Computer -UnjoinDomaincredential $credential ' 

151 '-Passthru -Verbose -Force')] 

152 

153 cmd = ";".join(cmds) 

154 self._remote_exec(server, cmd) 

155 

156 def get_current_domain(self, server): 

157 cmd = "(Get-WmiObject Win32_ComputerSystem).Domain" 

158 (out, err) = self._remote_exec(server, cmd) 

159 return out.strip() 

160 

161 def ensure_directory_exists(self, server, path): 

162 cmd = ["New-Item", "-ItemType", "Directory", 

163 "-Force", "-Path", self.quote_string(path)] 

164 self._remote_exec(server, cmd) 

165 

166 def remove(self, server, path, force=True, recurse=False, 

167 is_junction=False): 

168 if self.path_exists(server, path): 

169 if is_junction: 

170 cmd = ('[System.IO.Directory]::Delete(' 

171 '%(path)s, %(recurse)d)' 

172 % dict(path=self.quote_string(path), 

173 recurse=recurse)) 

174 else: 

175 cmd = ["Remove-Item", "-Confirm:$false", 

176 "-Path", self.quote_string(path)] 

177 if force: 177 ↛ 179line 177 didn't jump to line 179 because the condition on line 177 was always true

178 cmd += ['-Force'] 

179 if recurse: 179 ↛ 181line 179 didn't jump to line 181 because the condition on line 179 was always true

180 cmd += ['-Recurse'] 

181 self._remote_exec(server, cmd) 

182 else: 

183 LOG.debug("Skipping deleting path %s as it does " 

184 "not exist.", path) 

185 

186 def path_exists(self, server, path): 

187 cmd = ["Test-Path", path] 

188 (out, _) = self._remote_exec(server, cmd) 

189 return out.strip() == "True" 

190 

191 def normalize_path(self, path): 

192 return path.replace('/', '\\') 

193 

194 def get_interface_index_by_ip(self, server, ip): 

195 cmd = ('Get-NetIPAddress | ' 

196 'Where-Object {$_.IPAddress -eq "%(ip)s"} | ' 

197 'Select-Object -ExpandProperty InterfaceIndex' % 

198 dict(ip=ip)) 

199 

200 (out, err) = self._remote_exec(server, cmd) 

201 if_index = int(out) 

202 return if_index 

203 

204 def set_dns_client_search_list(self, server, search_list): 

205 src_list = ",".join(["'%s'" % domain for domain in search_list]) 

206 

207 cmd = ["Set-DnsClientGlobalSetting", 

208 "-SuffixSearchList", "@(%s)" % src_list] 

209 self._remote_exec(server, cmd) 

210 

211 def set_dns_client_server_addresses(self, server, if_index, dns_servers): 

212 dns_sv_list = ",".join(["'%s'" % dns_sv for dns_sv in dns_servers]) 

213 cmd = ["Set-DnsClientServerAddress", 

214 "-InterfaceIndex", if_index, 

215 "-ServerAddresses", "(%s)" % dns_sv_list] 

216 self._remote_exec(server, cmd) 

217 

218 def set_win_reg_value(self, server, path, key, value): 

219 cmd = ['Set-ItemProperty', '-Path', self.quote_string(path), 

220 '-Name', key, '-Value', value] 

221 self._remote_exec(server, cmd) 

222 

223 def get_win_reg_value(self, server, path, name=None): 

224 cmd = "Get-ItemProperty -Path %s" % self.quote_string(path) 

225 if name: 

226 cmd += " | Select-Object -ExpandProperty %s" % name 

227 return self._remote_exec(server, cmd, retry=False)[0] 

228 

229 def quote_string(self, string): 

230 return '"%s"' % string