Coverage for manila/ssh_utils.py: 80%
123 statements
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2026-02-18 22:19 +0000
1# Licensed under the Apache License, Version 2.0 (the "License"); you may
2# not use this file except in compliance with the License. You may obtain
3# a copy of the License at
4#
5# http://www.apache.org/licenses/LICENSE-2.0
6#
7# Unless required by applicable law or agreed to in writing, software
8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10# License for the specific language governing permissions and limitations
11# under the License.
13"""Ssh utilities."""
15from collections import deque
16from contextlib import contextmanager
17import hashlib
18import logging
19import os
20import threading
22from oslo_config import cfg
23from oslo_log import log
25from manila import exception
26from manila.i18n import _
29try:
30 import paramiko
31except ImportError:
32 paramiko = None
34CONF = cfg.CONF
35LOG = log.getLogger(__name__)
36if getattr(CONF, 'debug', False): 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true
37 logging.getLogger("paramiko").setLevel(logging.DEBUG)
40def get_fingerprint(self):
41 """Patch paramiko
43 This method needs to be patched to allow paramiko to work under FIPS.
44 Until the patch to do this merges, patch paramiko here.
46 TODO(carloss) Remove this when paramiko is patched.
47 See https://github.com/paramiko/paramiko/pull/1928
48 """
49 return hashlib.md5(self.asbytes(), usedforsecurity=False).digest()
52if paramiko is None: 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true
53 raise exception.RequirementMissing(req='paramiko')
55paramiko.pkey.PKey.get_fingerprint = get_fingerprint
58class SSHPool:
59 """A thread-safe SSH connection pool."""
61 def __init__(self, ip, port, conn_timeout, login, password=None,
62 privatekey=None, min_size=1, max_size=10):
63 self.ip = ip
64 self.port = port
65 self.login = login
66 self.password = password
67 self.conn_timeout = conn_timeout if conn_timeout else None
68 self.path_to_private_key = privatekey
69 self.min_size = min_size
70 self.max_size = max_size
72 # Concurrent connection management
73 self._lock = threading.RLock()
74 self._connections = deque()
75 self._current_size = 0
76 self._condition = threading.Condition(self._lock)
78 def create(self, quiet=False):
79 """Create one new SSH connection."""
80 ssh = paramiko.SSHClient()
81 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
82 look_for_keys = True
83 if self.path_to_private_key:
84 self.path_to_private_key = os.path.expanduser(
85 self.path_to_private_key)
86 look_for_keys = False
87 elif self.password:
88 look_for_keys = False
89 try:
90 LOG.debug("ssh.connect: ip: %s, port: %s, look_for_keys: %s, "
91 "timeout: %s, banner_timeout: %s",
92 self.ip,
93 self.port,
94 look_for_keys,
95 self.conn_timeout,
96 self.conn_timeout)
97 ssh.connect(self.ip,
98 port=self.port,
99 username=self.login,
100 password=self.password,
101 key_filename=self.path_to_private_key,
102 look_for_keys=look_for_keys,
103 timeout=self.conn_timeout,
104 banner_timeout=self.conn_timeout)
105 if self.conn_timeout:
106 transport = ssh.get_transport()
107 transport.set_keepalive(self.conn_timeout)
108 return ssh
109 except Exception as e:
110 msg = _("Check whether private key or password are correctly "
111 "set. Error connecting via ssh: %s") % e
112 if quiet: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 LOG.debug(msg)
114 else:
115 LOG.error(msg)
116 raise exception.SSHException(msg)
118 def get(self):
119 """Return an item from the pool, when one is available.
121 This method will block if no connections are available and the pool
122 is at maximum capacity. Check if a connection is active before
123 returning it. For dead connections create and return a new connection.
124 """
125 with self._condition:
126 # Try to get an existing connection
127 while True:
128 if self._connections:
129 conn = self._connections.popleft()
130 if conn and self._is_connection_active(conn): 130 ↛ 134line 130 didn't jump to line 134 because the condition on line 130 was always true
131 return conn
132 else:
133 # Connection is dead, close it and try again
134 if conn:
135 self._close_connection(conn)
136 self._current_size -= 1
137 continue
139 # No active connections available
140 if self._current_size < self.max_size:
141 # Create new connection
142 conn = self.create()
143 if conn: 143 ↛ 148line 143 didn't jump to line 148 because the condition on line 143 was always true
144 self._current_size += 1
145 return conn
147 # Pool is at max capacity, wait for a connection
148 self._condition.wait(timeout=30)
149 # If we timeout, try to create anyway
150 if (not self._connections and 150 ↛ 152line 150 didn't jump to line 152 because the condition on line 150 was never true
151 self._current_size < self.max_size):
152 conn = self.create()
153 if conn:
154 self._current_size += 1
155 return conn
157 def put(self, conn):
158 """Return a connection to the pool."""
159 if not conn: 159 ↛ 160line 159 didn't jump to line 160 because the condition on line 159 was never true
160 return
162 with self._condition:
163 if self._is_connection_active(conn):
164 self._connections.append(conn)
165 else:
166 self._close_connection(conn)
167 if self._current_size > 0: 167 ↛ 169line 167 didn't jump to line 169 because the condition on line 167 was always true
168 self._current_size -= 1
169 self._condition.notify()
171 def remove(self, ssh):
172 """Close an ssh client and remove it from the pool."""
173 with self._lock:
174 if ssh in self._connections:
175 self._connections.remove(ssh)
176 self._close_connection(ssh)
177 if self._current_size > 0: 177 ↛ exitline 177 didn't jump to the function exit
178 self._current_size -= 1
180 @contextmanager
181 def item(self):
182 """Context manager for getting/returning connections."""
183 conn = self.get()
184 try:
185 yield conn
186 finally:
187 self.put(conn)
189 def _is_connection_active(self, conn):
190 """Check if SSH connection is still active."""
191 try:
192 return (conn and
193 conn.get_transport() and
194 conn.get_transport().is_active())
195 except Exception:
196 return False
198 def _close_connection(self, conn):
199 """Safely close an SSH connection."""
200 try:
201 if conn: 201 ↛ exitline 201 didn't return from function '_close_connection' because the condition on line 201 was always true
202 conn.close()
203 except Exception:
204 pass # Ignore errors when closing
206 # Properties for backward compatibility with eventlet.pools.Pool
207 @property
208 def current_size(self):
209 """Current number of connections in the pool."""
210 with self._lock:
211 return self._current_size
213 @property
214 def free_items(self):
215 """Available connections (for backward compatibility)."""
216 with self._lock:
217 return self._connections