""" Base classes for clients and connections of x/84. """
import array
import errno
import logging
import socket
import threading
import time
import warnings
# local
from x84.bbs.exception import Disconnected
from x84.terminal import spawn_client_session
[docs]class BaseClient(object):
"""
Base class for remote client implementations.
Instantiated by the corresponding :class:`BaseServer` class.
"""
#: Override in subclass: a general string identifier for the
#: connecting protocol (for example, 'telnet', 'ssh', 'rlogin')
kind = None
#: maximum unit of data received for each call to socket_recv()
BLOCKSIZE_RECV = 64
#: terminal type identifier when not yet negotiated
TTYPE_UNDETECTED = 'unknown'
def __init__(self, sock, address_pair, on_naws=None):
""" Class initializer. """
self.log = logging.getLogger(self.__class__.__name__)
self.sock = sock
self.address_pair = address_pair
self.on_naws = on_naws
self.active = True
self.env = dict([('TERM', self.TTYPE_UNDETECTED),
('LINES', 24),
('COLUMNS', 80),
('connection-type', self.kind),
])
self.send_buffer = array.array('c')
self.recv_buffer = array.array('c')
self.bytes_received = 0
self.connect_time = time.time()
self.last_input_time = time.time()
[docs] def close(self):
""" Close connection with the client. """
self.shutdown()
[docs] def fileno(self):
""" File descriptor number of socket. """
try:
return self.sock.fileno()
except socket.error:
return None
[docs] def recv_ready(self):
"""
Subclass and implement: whether socket_recv() should be called.
:raises NotImplementedError
"""
raise NotImplementedError()
[docs] def send(self):
"""
Send any data buffered and return number of bytes send.
:raises Disconnected: client has disconnected (cannot write to socket).
"""
if not self.send_ready():
warnings.warn('send() called on empty buffer', RuntimeWarning, 2)
return 0
ready_bytes = bytes(''.join(self.send_buffer))
self.send_buffer = array.array('c')
def _send(send_bytes):
"""
Inner low-level function for socket send.
:raises Disconnected: on sock.send error.
"""
try:
return self.sock.send(send_bytes)
except socket.error as err:
if err.errno in (errno.EDEADLK, errno.EAGAIN):
self.log.debug('{self.addrport}: {err} (bandwidth exceed)'
.format(self=self, err=err))
return 0
raise Disconnected('send: {0}'.format(err))
sent = _send(ready_bytes)
if sent < len(ready_bytes):
# re-buffer data that could not be pushed to socket;
self.send_buffer.fromstring(ready_bytes[sent:])
return sent
[docs] def send_ready(self):
""" Whether any data is buffered for delivery. """
return bool(self.send_buffer.__len__())
[docs] def shutdown(self):
"""
Shutdown and close socket.
Called by event loop after client is marked by :meth:`deactivate`.
"""
try:
self.sock.shutdown(socket.SHUT_RDWR)
self.log.debug('{self.addrport}: socket shutdown '
'{self.__class__.__name__}'.format(self=self))
except socket.error:
pass
self.active = False
self.sock.close()
[docs] def socket_recv(self):
"""
Receive data from socket, returns number of bytes received.
:raises Disconnect: client has disconnected.
:rtype: int
"""
try:
data = self.sock.recv(self.BLOCKSIZE_RECV)
recv = len(data)
if recv == 0:
raise Disconnected('Closed by client (EOF)')
except socket.error as err:
if err.errno == errno.EWOULDBLOCK:
return 0
raise Disconnected('socket_recv error: {0}'.format(err))
self.bytes_received += recv
self.last_input_time = time.time()
self.recv_buffer.fromstring(data)
return recv
[docs] def send_str(self, bstr):
""" Buffer bytestring for client. """
self.send_buffer.fromstring(bstr)
[docs] def send_unicode(self, ucs, encoding='utf8'):
""" Buffer unicode string, encoded for client as 'encoding'. """
self.send_str(ucs.encode(encoding, 'replace'))
[docs] def is_active(self):
""" Whether this connection is active (bool). """
return self.active
[docs] def deactivate(self):
""" Flag client for disconnection by engine loop. """
if self.active:
self.active = False
self.log.debug('{self.addrport}: deactivated'.format(self=self))
[docs] def idle(self):
""" Time elapsed since data was last received. """
return time.time() - self.last_input_time
[docs] def duration(self):
""" Time elapsed since connection was made. """
return time.time() - self.connect_time
@property
def addrport(self):
""" IP address and port of connection as string (ip:port). """
return '%s:%d' % (self.address_pair[0], self.address_pair[1])
[docs]class BaseConnect(threading.Thread):
""" Base class for client connect factories. """
#: whether this thread is completed. Set to ``True`` to cause an on-connect
#: thread to forcefully exit.
stopped = False
def __init__(self, client):
""" Class initializer. """
self.client = client
threading.Thread.__init__(self)
self.log = logging.getLogger(self.__class__.__name__)
[docs] def banner(self):
""" Write data on-connect, callback from :meth:`run`. """
pass
[docs] def run(self):
"""
Negotiate a connecting session.
In the case of telnet and ssh, for example, negotiates and
inquires about terminal type, telnet options, window size,
and tcp socket options before spawning a new session.
"""
try:
self._set_socket_opts()
self.banner()
if self.client.is_active():
return spawn_client_session(client=self.client)
except (Disconnected, socket.error) as err:
self.log.debug('Connection closed: %s', err)
finally:
self.stopped = True
self.client.deactivate()
def _set_socket_opts(self):
"""
Set socket non-blocking and enable TCP KeepAlive.
Callback from :meth:`run`.
"""
self.client.sock.setblocking(0)
self.client.sock.setsockopt(
socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)