Bonjour,

J'utilise le framework twisted via le module conch pour créer un serveur SSH.
Mon utilisateur s'authentifie via PAM (cette partie fonctionne).
Une fois authentifié l'utilisateur à le droit à certaines commandes qui sont définient dans la méthode lineReceived de la classe mySSHProtocol.

Si je lance une commande graphique (type curse comme top) ou un shell via os.system, la commande s’exécute sur le serveur et l'output de la commande se trouve également sur le serveur aussi (la sortie de la commande n'est pas transférée au client).
Je pense qu'il faut passer par la méthode execCommand de l'interface ISession ou de la classe twisted.conch.unix.SSHSessionForUnixConchUser (fonction reactor.spawnProcess) :, mais je ne sais pas comment faire... Ça fait 2 jours que je galère... Un petit coup de main serait le bienvenue

Je veux juste faire un shell avec seulement quelques commandes disponibles pour l'utilisateur authentifié.

Code : Sélectionner tout - Visualiser dans une fenêtre à part
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
 
#!/usr/bin/env python
import twisted.cred as cred, twisted.cred.credentials, twisted.cred.checkers, twisted.cred.portal
import twisted.conch as conch, twisted.conch.recvline, twisted.conch.avatar, twisted.conch.checkers, twisted.conch.ssh, twisted.conch.ssh.factory, twisted.conch.ssh.session
import twisted.internet as internet, twisted.internet.defer
import twisted.application as application
from twisted.python.randbytes import secureRandom
from twisted.python import log
from zope.interface import implements
import os, sys, re, socket
import PAM
 
log.startLogging(sys.stderr)
 
class PamPasswordDatabase:
	"""Authentication/authorization backend using the 'login' PAM service"""
	credentialInterfaces = cred.credentials.IUsernamePassword,
	implements(cred.checkers.ICredentialsChecker)
 
        def requestAvatarId(self, credentials):
                service = 'passwd'
                auth = PAM.pam()
                auth.start(service)
                auth.set_item(PAM.PAM_USER, credentials.username)
		auth.set_item(PAM.PAM_CONV, lambda *args: [(credentials.password, 0)])
                try:
                        auth.authenticate()
                        auth.acct_mgmt()
                except(PAM.error):
                        return internet.defer.fail(cred.error.UnauthorizedLogin("invalid password"))
                except:
                        return internet.defer.fail(cred.error.UnauthorizedLogin("internal error"))
                else:
                        return internet.defer.succeed(credentials.username)
 
class mySSHProtocol(conch.recvline.HistoricRecvLine):
	def __init__(self, user):
		self.user = user
 
	#def keystrokeReceived(self, keyID, modifier):
	#	pass
	#	if keyID == '\x04': # CTRL+d (eof)
	#		self.terminal.loseConnection()
		#else if keyID == '\x0c': # CTRL+l
		#	pass
		#	self.terminal.eraseDisplay() #(clear screen)
		#else if keyID == '\x03' # CTRL+c
		#	self._call.stop()
	#	return
 
	def connectionMade(self):
		conch.recvline.HistoricRecvLine.connectionMade(self)
		self.terminal.write("My SSH")
	  	self.terminal.nextLine()
	  	self.showPrompt()
 
	def showPrompt(self):
	  	self.terminal.write("%s > " % socket.gethostname())
 
	def lineReceived(self, line):
		# !!!!! C'est la que ça foire !!!!!
		if line == 'htop':
			self.terminal.write(os.system('htop'))
 
		if line == 'exit':
			self.terminal.loseConnection()
 
 
class mySSHAvatar(conch.avatar.ConchUser):
	implements(conch.interfaces.ISession)
 
	def __init__(self, username):
 		conch.avatar.ConchUser.__init__(self)
		self.username = username
		self.channelLookup.update({'session':conch.ssh.session.SSHSession})
 
	def openShell(self, protocol):
		serverProtocol = conch.insults.insults.ServerProtocol(mySSHProtocol, self)
		serverProtocol.makeConnection(protocol)
		protocol.makeConnection(conch.ssh.session.wrapProtocol(serverProtocol))
 
	def getPty(self, terminal, windowSize, attrs):
		return None
 
	def execCommand(self, protocol, cmd):
    		pass
 
	def closed(self):
		pass
 
 
class mySSHRealm:
	implements(cred.portal.IRealm)
 
	def requestAvatar(self, avatarId, mind, *interfaces):
		if conch.interfaces.IConchUser in interfaces:
			return(interfaces[0], mySSHAvatar(avatarId), lambda: None)
		else:
			raise(Exception, "No supported interfaces found.")
 
def getRSAKeys():
	from Crypto.PublicKey import RSA
	dir_keys = os.path.join(os.path.dirname(sys.argv[0]), 'ssh_keys')
	publicKey = os.path.join(dir_keys, 'public.key')
	privateKey = os.path.join(dir_keys, 'private.key')
 
	try:
		if not os.path.isdir(dir_keys):
			os.mkdir(dir_keys)
 
		if not os.path.isfile(publicKey) or not os.path.isfile(privateKey):
			key_size = 4096
			log.msg('Generating %i bits RSA keypair...' % key_size)
			cle = RSA.generate(key_size, secureRandom)
			file(publicKey, 'w').write(keys.Key(cle).public().toString('openssh'))
			file(privateKey, 'w').write(keys.Key(cle).toString('openssh'))
			os.chmod(privateKey, 0400)
	except:
		log.err('Unable to create SSH keys, please verify write access on %s' % dir_keys)
		sys.exit(1)
 
	try:
		publicKeyString = file(publicKey, 'r').read()
		privateKeyString = file(privateKey, 'r').read()
	except:
		log.err('Unable to read SSH keys, please verify read access on %s' % dir_keys)
		sys.exit(1)
 
	return publicKeyString, privateKeyString
 
if __name__ == "__main__":
	sshFactory = conch.ssh.factory.SSHFactory()
	sshFactory.portal = cred.portal.Portal(mySSHRealm())
	#users = {'admin': 'aaa', 'guest': 'bbb'}
	#sshFactory.portal.registerChecker(cred.checkers.InMemoryUsernamePasswordDatabaseDontUse(**users))
	sshFactory.portal.registerChecker(PamPasswordDatabase())   # Supports PAM
	sshFactory.portal.registerChecker(conch.checkers.SSHPublicKeyDatabase())  # Supports PKI
	pubKeyString, privKeyString = getRSAKeys()
	sshFactory.publicKeys = {'ssh-rsa': conch.ssh.keys.Key.fromString(data = pubKeyString)}
	sshFactory.privateKeys = {'ssh-rsa': conch.ssh.keys.Key.fromString(data = privKeyString)}
	internet.reactor.listenTCP(2222, sshFactory)
	internet.reactor.run()
Merci d'avance pour votre aide