1 Star 0 Fork 0

Da yang/Devops

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
sshclient.py 1.53 KB
一键复制 编辑 原始数据 按行查看 历史
Da yang 提交于 2021-05-08 09:17 +08:00 . last update
# encoding=utf8
# from flask_socketio import SocketIO, emit
import paramiko
# import socketio
# socketio = SocketIO()
class SSHProxy(object):
def __init__(self, hostname, port, username, password):
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.transport = None
self.event_name = 'dcenter'
self.name_space = '/dcenter'
def open(self):
self.transport = paramiko.Transport((self.hostname, self.port))
self.transport.connect(username=self.username, password=self.password)
def command(self, cmd):
xssh = paramiko.SSHClient()
xssh._transport = self.transport
stdin, stdout, stderr = xssh.exec_command(cmd, get_pty=True, timeout=3)
status = stdout.channel
stdout = stdout
code = status.recv_exit_status()
result = (stdout.read().decode('utf-8'))
res = {'code': code, 'result': str(result), 'stderr': stderr, 'stdout': stdout}
return res
# return result, stderr
def upload(self, local_path, remote_path):
sftp = paramiko.SFTPClient.from_transport(self.transport)
sftp.put(local_path, remote_path)
sftp.close()
def close(self):
self.transport.close()
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == '__main__':
with SSHProxy('192.168.64.149', 22, 'root', '123456') as ssh:
ssh.command('pwd')
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/shyanglin/Devops.git
git@gitee.com:shyanglin/Devops.git
shyanglin
Devops
Devops
master

搜索帮助