Hello,

I have created a telnet server in python.

Maybe you wonder why to create a telnet server while Windows has one? Because the windows telnet server does not allow to interract with the desktop. If you try to start a GUI app, it will start and run but will not be displayed on the server desktop.

My telnet server will allow to start a GUI application interracting with the windows desktop of the server.
Ex. : typing "notepad" in the telnet console will pop up the notepad on the Windows desktop of the server.

At the time this server works but has no authentication feature implemented.

I used Twisted and "StatefulTelnetProtocol" class.
It is inspired by the code "simplecred.py" at this address
Here is the code of the server without authentication:

from twisted.internet.protocol import ServerFactory, Protocol
from twisted.conch.telnet import StatefulTelnetProtocol
from twisted.internet import reactor
import new_subprocess
from time import sleep

class ShellTelnet():
    def __init__(self):
        self.shell = 'cmd /u /q'
        self.tail = '\r\n'
        self.MyShell = new_subprocess.Popen(self.shell, stdin=new_subprocess.PIPE, stdout=new_subprocess.PIPE, stderr=new_subprocess.subprocess.STDOUT)
        print "PID ShellTelnet: %d" % self.MyShell.pid

    def getshellpid(self):
        return self.MyShell.pid

    def sendcommand(self, my_cmd):
        print "DEBUB: sendcomand called with *%s*" % repr(my_cmd)
        new_subprocess.send_all(self.MyShell, my_cmd + self.tail)

    def read(self):
        data = 'cls'
        dataread = ''
        while data.strip():
            data = new_subprocess.recv_some(self.MyShell, t=0.1)
            dataread += data
            sleep(0.25)
        print dataread
        return dataread

    def killshell(self):
        import ctypes
        ctypes.windll.kernel32.TerminateProcess(int(self.MyShell._handle), -1)

class MyProtocol(StatefulTelnetProtocol):
    def connectionMade(self):
        print "DEBUG: connectionMade called"
        self.sendLine("***************************************\r\n")
        self.sendLine("Welcome to the Simplified Telnet Server\r\n")
        self.sendLine("***************************************\r\n")
        self.Shell = ShellTelnet()
        self.sendLine("The telnet shell PID on the server is %d" % self.Shell.getshellpid())
        sleep(2)
        self.transport.write(self.Shell.read().strip())
        self.clearLineBuffer()

    def lineReceived(self, line):
        print "DEBUG: lineReceived called with %s" % line
        if line.strip() != "exit":
            self.Shell.sendcommand(line.strip())
            self.transport.write(self.Shell.read().strip())
        else:
            self.Shell.killshell()
            self.sendLine("TelnetShell killed. Bye Bye ...")
            self.transport.loseConnection()
        self.clearLineBuffer()
        
    def connectionLost(self, reason):
        print "DEBUG: connectionLost called with: %s" % str(reason)

def CreateMyFactory():
    factory = ServerFactory()
    factory.protocol = MyProtocol
    return factory

if __name__ == "__main__":
    MaFactory = CreateMyFactory()
    reactor.listenTCP(8023, MaFactory)
    reactor.run()

Now I would like to implement the authentication using the "AuthenticatingTelnetProtocol" class.
I have been trying since two weeks but I cannot get it work.
The server stars and listen for a connection on the port 8023. As soon as a connection is attempted, the server trigs an error message.
There is something I wrong with "AuthenticatingTelnetProtocol" but I don't know what.

The code of the server with authentication:

from zope.interface import Interface, implements

from twisted.internet.protocol import ServerFactory, Protocol
from twisted.conch.telnet import AuthenticatingTelnetProtocol, StatefulTelnetProtocol, ITelnetProtocol


from twisted.cred import portal, checkers, credentials, error as credError
from twisted.protocols import basic
from twisted.internet import protocol, reactor, defer
from zope.interface import Interface, implements

class PasswordDictChecker(object):
    implements(checkers.ICredentialsChecker)
    credentialInterfaces = (credentials.IUsernamePassword,)
##    credentialInterfaces = (ITelnetProtocol,)

    def __init__(self, passwords):
        "passwords: a dict-like object mapping usernames to passwords"
        print "DEBUG - PasswordDictChecker - __init__"
        self.passwords = passwords
        print "DEBUG - PasswordDictChecker - self.passwords", self.passwords

    def requestAvatarId(self, credentials):
        print "DEBUG - PasswordDictChecker - requestAvatarId - credentials", credentials
        username = credentials.username
        if self.passwords.has_key(username):
            if credentials.password == self.passwords[username]:
                return defer.succeed(username)
            else:
                return defer.fail(
                    credError.UnauthorizedLogin("Bad password"))
        else:
            return defer.fail(
                credError.UnauthorizedLogin("No such user"))

class INamedUserAvatar(Interface):
    "should have attributes username and fullname"
    print "DEBUG - INamedUserAvatar :", Interface

class NamedUserAvatar:
    implements(INamedUserAvatar)
    def __init__(self, username, fullname):
        self.username = username
        self.fullname = fullname
        print "DEBUG - NamedUserAvatar - __init__ :", username, fullname

class INamedUserAvatar2(ITelnetProtocol):
    "should have attributes username and fullname"
    #print "DEBUG - INamedUserAvatar :", Interface

class NamedUserAvatar2:
    implements(INamedUserAvatar2)
    def __init__(self, username, fullname):
        self.username = username
        self.fullname = fullname
        print "DEBUG - NamedUserAvatar - __init__ :", username, fullname


class TestRealm:
    print "DEBUG - class TestRealm"
    implements(portal.IRealm)
    print "DEBUG - class TestRealm - after implements"

    def __init__(self, users):
        print "DEBUG - class TestRealm - __init__ users", users
        self.users = users

    def requestAvatar(self, avatarId, mind, *interfaces):
        print "DEBUG - class TestRealm - requestAvatar"
        print "*interfaces", interfaces
        if INamedUserAvatar in interfaces:
            print "DEBUG: requestAvatar - avatarId :", avatarId
            print "DEBUG: requestAvatar - self.users[avatarId] :", self.users[avatarId]
            fullname = self.users[avatarId]
            logout = lambda: None
            print "DEBUG: INamedUserAvatar :",INamedUserAvatar
            print "DEBUG: NamedUserAvatar(avatarId, fullname) :", NamedUserAvatar(avatarId, fullname)
            return (INamedUserAvatar, 
                    NamedUserAvatar(avatarId, fullname),

                    logout)
        elif INamedUserAvatar2 in interfaces:
            print "DEBUG2: requestAvatar - avatarId :", avatarId
            print "DEBUG2: requestAvatar - self.users[avatarId] :", self.users[avatarId]
            fullname = self.users[avatarId]
            logout = lambda: None
            print "DEBUG2: INamedUserAvatar :",ITelnetProtocol
            print "DEBUG2: NamedUserAvatar(avatarId, fullname) :", TelnetProtocol(avatarId, fullname)
            return (INamedUserAvatar2, NamedUserAvatar2(avatarId, fullname), logout)
        else:
            print "DEBUG: requestAvatar - requestAvatar -else :", avatarId
            raise KeyError("None of the requested interfaces is supported")

class LoginTestProtocol000(basic.LineReceiver):
    def lineReceived(self, line):
        cmd = getattr(self, 'handle_' + self.currentCommand)
        cmd(line.strip( ))

    def connectionMade(self):
        self.transport.write("User Name: ")
        self.currentCommand = 'user'

    def handle_user(self, username):
        self.username = username
        self.transport.write("Password: ")
        self.currentCommand = 'pass'

    def handle_pass(self, password):
        creds = credentials.UsernamePassword(self.username, password)
        self.factory.portal.login(creds, None, INamedUserAvatar).addCallback(
            self._loginSucceeded).addErrback(
            self._loginFailed)

    def _loginSucceeded(self, avatarInfo):
        avatarInterface, avatar, logout = avatarInfo
        self.transport.write("Welcome %s!\r\n" % avatar.fullname)
        defer.maybeDeferred(logout).addBoth(self._logoutFinished)

    def _logoutFinished(self, result):
        self.transport.loseConnection( )

    def _loginFailed(self, failure):
        self.transport.write("Denied: %s.\r\n" % failure.getErrorMessage( ))
        self.transport.loseConnection( )

class LoginTestProtocol(AuthenticatingTelnetProtocol):
    print "DEBUG: LoginTestProtocol"

class LoginTestFactory(protocol.ServerFactory):
    protocol = LoginTestProtocol

    def __init__(self, portal):
        print "DEBUG: LoginTestFactory - __init__ - portal:"
        self.portal = portal
        print "DEBUG: LoginTestFactory - __init__ - portal after", repr(self.portal)
        


users = {
    'admin': 'Admin User',
    'user1': 'Joe Smith',
    'user2': 'Bob King',
    }

passwords = {
    'admin': 'aaa',
    'user1': 'bbb',
    'user2': 'ccc'
    }

if __name__ == "__main__":
    p = portal.Portal(TestRealm(users))
    p.registerChecker(PasswordDictChecker(passwords))
    factory = LoginTestFactory(p)
    reactor.listenTCP(2323, factory)
    reactor.run( )

and the error message when a connection is attempted on the server:

D:\workspace\Twisted_example>Telnet_Server_with_auth.py
DEBUG - INamedUserAvatar : <InterfaceClass zope.interface.Interface>
DEBUG - class TestRealm
DEBUG - class TestRealm - after implements
DEBUG: LoginTestProtocol
DEBUG - class TestRealm - __init__ users {'admin': 'Admin User', 'user2': 'Bob King', 'user1': 'Joe Smith'}
DEBUG - PasswordDictChecker - __init__
DEBUG - PasswordDictChecker - self.passwords {'admin': 'aaa', 'user2': 'ccc', 'user1': 'bbb'}
DEBUG: LoginTestFactory - __init__ - portal:
DEBUG: LoginTestFactory - __init__ - portal after <twisted.cred.portal.Portal instance at 0x00F11CD8>
Traceback (most recent call last):
  File "C:\Python25\lib\site-packages\twisted\python\log.py", line 69, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python25\lib\site-packages\twisted\python\context.py", line 59, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python25\lib\site-packages\twisted\python\context.py", line 37, in callWithContext
    return func(*args,**kw)
  File "C:\Python25\lib\site-packages\twisted\internet\selectreactor.py", line 146, in _doReadOrWrite
    why = getattr(selectable, method)()
--- <exception caught here> ---
  File "C:\Python25\lib\site-packages\twisted\internet\tcp.py", line 932, in doRead
    protocol = self.factory.buildProtocol(self._buildAddr(addr))
  File "C:\Python25\lib\site-packages\twisted\internet\protocol.py", line 98, in buildProtocol
    p = self.protocol()
exceptions.TypeError: __init__() takes exactly 2 arguments (1 given)

Thanks for your help

That is getting really cool, nice work.
But really, I dont know whats wrong with that.

Dan08.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.