diff --git a/ts3/__init__.py b/ts3/__init__.py index 77f6c0d..cf21772 100644 --- a/ts3/__init__.py +++ b/ts3/__init__.py @@ -25,368 +25,11 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import time -import telnetlib -import logging -from threading import Lock - +from protocol import TS3Proto, TS3Response, ConnectionError, NoConnectionError, InvalidArguments +from server import TS3Server from defines import * __version__ = "0.1" __license__ = "BSD 3-Clause" __copyright__ = "Copyright 2011, Andrew Williams" -__author__ = "Andrew Williams, Krzysztof Jagiello" - -class ConnectionError(Exception): - def __init__(self, ip, port): - self.ip = ip - self.port = port - - def __str__(): - return 'Error connecting to host %s port %s.' % (self.ip, self.port,) - -class NoConnection(Exception): - def __str__(): - return 'No connection established.' % (self.ip, self.port,) - -class InvalidArguments(ValueError): - """ - Raised when a abstracted function has received invalid arguments - """ - -ts3_escape = [ - (chr(92), r'\\'), # \ - (chr(47), r"\/"), # / - (chr(32), r'\s'), # Space - (chr(124), r'\p'), # | - (chr(7), r'\a'), # Bell - (chr(8), r'\b'), # Backspace - (chr(12), r'\f'), # Formfeed - (chr(10), r'\n'), # Newline - (chr(13), r'\r'), # Carrage Return - (chr(9), r'\t'), # Horizontal Tab - (chr(11), r'\v'), # Vertical tab -] - - -class TS3Response(): - def __init__(self, response, data): - self.response = TS3Proto.parse_response(response) - self.data = TS3Proto.parse_data(data) - - if isinstance(self.data, dict): - if self.data: - self.data = [self.data] - else: - self.data = [] - - @property - def is_successful(self): - return self.response['msg'] == 'ok' - -class TS3Proto(): - - io_lock = Lock() - - @property - def logger(self): - if not hasattr(self, "_logger"): - self._logger = logging.getLogger(__name__) - return self._logger - - def connect(self, ip, port=10011, timeout=5): - self.io_lock.acquire() - try: - self._telnet = telnetlib.Telnet(ip, port) - except telnetlib.socket.error: - raise ConnectionError(ip, port) - - self._timeout = timeout - self._connected = False - - data = self._telnet.read_until("\n\r", self._timeout) - self.io_lock.release() - - if data.endswith("TS3\n\r"): - self._connected = True - - return self._connected - - def disconnect(self): - self.check_connection() - - self.send_command("quit") - self._telnet.close() - - self._connected = False - - def send_command(self, command, keys=None, opts=None): - self.check_connection() - - commandstr = self.construct_command(command, keys=keys, opts=opts) - self.logger.debug("send_command - %s" % commandstr) - - self.io_lock.acquire() - self._telnet.write("%s\n\r" % commandstr) - - data = "" - response = self._telnet.read_until("\n\r", self._timeout) - self.io_lock.release() - - if not response.startswith("error"): - # what we just got was extra data - data = response - response = self._telnet.read_until("\n\r", self._timeout) - - return TS3Response(response, data) - - def check_connection(self): - if not self.is_connected: - raise NoConnectionError - - def is_connected(self): - return self._connected - - def construct_command(self, command, keys=None, opts=None): - """ - Constructs a TS3 formatted command string - - Keys can have a single nested list to construct a nested parameter - - @param command: Command list - @type command: string - @param keys: Key/Value pairs - @type keys: dict - @param opts: Options - @type opts: list - """ - - cstr = [command] - - # Add the keys and values, escape as needed - if keys: - for key in keys: - if isinstance(keys[key], list): - ncstr = [] - for nest in keys[key]: - ncstr.append("%s=%s" % (key, self._escape_str(nest))) - cstr.append("|".join(ncstr)) - else: - cstr.append("%s=%s" % (key, self._escape_str(keys[key]))) - - # Add in options - if opts: - for opt in opts: - cstr.append("-%s" % opt) - - return " ".join(cstr) - - @staticmethod - def parse_response(response): - """ - Parses a TS3 command string into command/keys/opts tuple - - @param command: Command string - @type command: string - """ - - # responses always begins with "error " so we may just skip it - return TS3Server.parse_data(response[6:]) - - @staticmethod - def parse_data(data): - """ - Parses data string consisting of key=value - - @param data: data string - @type data: string - """ - - data = data.strip() - - multipart = data.split('|') - - if len(multipart) > 1: - values = [] - - for part in multipart: - values.append(TS3Proto.parse_data(part)) - return values - - chunks = data.split(' ') - parsed_data = {} - - for chunk in chunks: - chunk = chunk.strip().split('=') - - if len(chunk) > 1: - if len(chunk) > 2: - # value can contain '=' which may confuse our parser - chunk = [chunk[0], '='.join(chunk[1:])] - - key, value = chunk - parsed_data[key] = TS3Proto._unescape_str(value) - else: - # TS3 Query Server may sometimes return a key without any value - # and we default its value to None - parsed_data[chunk[0]] = None - - return parsed_data - - @staticmethod - def _escape_str(value): - """ - Escape a value into a TS3 compatible string - - @param value: Value - @type value: string/int - - """ - - if isinstance(value, int): - return str(value) - - for i, j in ts3_escape: - value = value.replace(i, j) - - return value - - @staticmethod - def _unescape_str(value): - """ - Unescape a TS3 compatible string into a normal string - - @param value: Value - @type value: string/int - - """ - - if isinstance(value, int): - return str(value) - - for i, j in ts3_escape: - value = value.replace(j, i) - - return value - - -class TS3Server(TS3Proto): - def __init__(self, ip=None, port=10011, id=0): - """ - Abstraction class for TS3 Servers - - @param ip: IP Address - @type ip: str - @param port: Port Number - @type port: int - - """ - if ip and port: - if self.connect(ip, port) and id > 0: - self.use(id) - - @property - def logger(self): - if not hasattr(self, "_logger"): - self._logger = logging.getLogger(__name__) - return self._logger - - def login(self, username, password): - """ - Login to the TS3 Server - - @param username: Username - @type username: str - @param password: Password - @type password: str - """ - - response = self.send_command('login', keys={'client_login_name': username, 'client_login_password': password }) - return response.is_successful - - def serverlist(self): - """ - Get a list of all Virtual Servers on the connected TS3 instance - """ - return self.send_command('serverlist') - - def gm(self, msg): - """ - Send a global message to the current Virtual Server - - @param msg: Message - @type ip: str - """ - response = self.send_command('gm', keys={'msg': msg}) - return response.is_successful - - def use(self, id): - """ - Use a particular Virtual Server instance - - @param id: Virtual Server ID - @type id: int - """ - response = self.send_command('use', keys={'sid': id}) - return response.is_successful - - def clientlist(self): - """ - Returns a clientlist of the current connected server/vhost - """ - - response = self.send_command('clientlist') - - if response.is_successful: - clientlist = {} - for client in response.data: - clientlist[client['clid']] = client - return clientlist - else: - # TODO: Raise a exception? - self.logger.debug("clientlist - error retrieving client list") - return {} - - def clientkick(self, clid=None, cldbid=None, type=REASON_KICK_SERVER, message=None): - """ - Kicks a user identified by either clid or cldbid - """ - - client = None - if cldbid: - clientlist = self.send_command('clientlist') - for cl in clientlist.values(): - if int(cl['client_database_id']) == cldbid: - client = cl['clid'] - self.logger.debug("clientkick - identified user from clid (%s = %s)" % (cldbid, client)) - break - - if not client: - # TODO: we should throw an exception here actually - self.logger.debug("clientkick - no client with specified cldbid (%s) was found" % cldbid) - return False - elif clid: - client = clid - else: - raise InvalidArguments('No clid or cldbid provided') - - if not message: - message = '' - else: - # Kick message can only be 40 characters - message = message[:40] - - if client: - self.logger.debug("clientkick - Kicking clid %s" % client) - response = self.send_command('clientkick', keys={'clid': client, 'reasonid': type, 'reasonmsg': message}) - return response.is_successful - - return False - - def clientpoke(self, clid, message): - """ - Poke a client with the specified message - """ - - response = self.send_command('clientpoke', keys={'clid': clid, 'msg': message}) - return response.is_successful +__author__ = "Andrew Williams, Krzysztof Jagiello" \ No newline at end of file diff --git a/ts3/protocol.py b/ts3/protocol.py new file mode 100644 index 0000000..86571e5 --- /dev/null +++ b/ts3/protocol.py @@ -0,0 +1,270 @@ +# Python TS3 Library (python-ts3) +# +# Copyright (c) 2011, Andrew Williams +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of the nor the +# names of its contributors may be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import logging +import telnetlib +from threading import Lock + + +class ConnectionError(Exception): + def __init__(self, ip, port): + self.ip = ip + self.port = port + + def __str__(self): + return 'Error connecting to host %s port %s.' % (self.ip, self.port) + + +class NoConnectionError(Exception): + def __str__(self): + return 'No connection established.' + + +class InvalidArguments(ValueError): + """ + Raised when a abstracted function has received invalid arguments + """ + + +ts3_escape = [ + (chr(92), r'\\'), # \ + (chr(47), r"\/"), # / + (chr(32), r'\s'), # Space + (chr(124), r'\p'), # | + (chr(7), r'\a'), # Bell + (chr(8), r'\b'), # Backspace + (chr(12), r'\f'), # Form Feed + (chr(10), r'\n'), # Newline + (chr(13), r'\r'), # Carriage Return + (chr(9), r'\t'), # Horizontal Tab + (chr(11), r'\v'), # Vertical tab +] + + +class TS3Response(): + def __init__(self, response, data): + self.response = TS3Proto.parse_response(response) + self.data = TS3Proto.parse_data(data) + + if isinstance(self.data, dict): + if self.data: + self.data = [self.data] + else: + self.data = [] + + @property + def is_successful(self): + return self.response['msg'] == 'ok' + + +class TS3Proto(): + + def __init__(self): + self.io_lock = Lock() + self._connected = False + self._timeout = 0 + self._telnet = None + self._logger = logging.getLogger(__name__) + + @property + def logger(self): + return self._logger + + def connect(self, ip, port=10011, timeout=5): + self.io_lock.acquire() + try: + self._telnet = telnetlib.Telnet(ip, port) + except telnetlib.socket.error: + raise ConnectionError(ip, port) + + self._timeout = timeout + self._connected = False + + data = self._telnet.read_until("\n\r", self._timeout) + self.io_lock.release() + + if data.endswith("TS3\n\r"): + self._connected = True + + return self._connected + + def disconnect(self): + self.check_connection() + + self.send_command("quit") + self._telnet.close() + + self._connected = False + + def send_command(self, command, keys=None, opts=None): + self.check_connection() + + commandstr = self.construct_command(command, keys=keys, opts=opts) + self.logger.debug("send_command - %s" % commandstr) + + self.io_lock.acquire() + self._telnet.write("%s\n\r" % commandstr) + + data = "" + response = self._telnet.read_until("\n\r", self._timeout) + self.io_lock.release() + + if not response.startswith("error"): + # what we just got was extra data + data = response + response = self._telnet.read_until("\n\r", self._timeout) + + return TS3Response(response, data) + + def check_connection(self): + if not self.is_connected: + raise NoConnectionError + + def is_connected(self): + return self._connected + + def construct_command(self, command, keys=None, opts=None): + """ + Constructs a TS3 formatted command string + + Keys can have a single nested list to construct a nested parameter + + @param command: Command list + @type command: string + @param keys: Key/Value pairs + @type keys: dict + @param opts: Options + @type opts: list + """ + + cstr = [command] + + # Add the keys and values, escape as needed + if keys: + for key in keys: + if isinstance(keys[key], list): + ncstr = [] + for nest in keys[key]: + ncstr.append("%s=%s" % (key, self._escape_str(nest))) + cstr.append("|".join(ncstr)) + else: + cstr.append("%s=%s" % (key, self._escape_str(keys[key]))) + + # Add in options + if opts: + for opt in opts: + cstr.append("-%s" % opt) + + return " ".join(cstr) + + @staticmethod + def parse_response(response): + """ + Parses a TS3 command string into command/keys/opts tuple + + @param response: Command string + @type response: string + """ + + # responses always begins with "error " so we may just skip it + return TS3Proto.parse_data(response[6:]) + + @staticmethod + def parse_data(data): + """ + Parses data string consisting of key=value + + @param data: data string + @type data: string + """ + + data = data.strip() + + multipart = data.split('|') + + if len(multipart) > 1: + values = [] + + for part in multipart: + values.append(TS3Proto.parse_data(part)) + return values + + chunks = data.split(' ') + parsed_data = {} + + for chunk in chunks: + chunk = chunk.strip().split('=') + + if len(chunk) > 1: + if len(chunk) > 2: + # value can contain '=' which may confuse our parser + chunk = [chunk[0], '='.join(chunk[1:])] + + key, value = chunk + parsed_data[key] = TS3Proto._unescape_str(value) + else: + # TS3 Query Server may sometimes return a key without any value + # and we default its value to None + parsed_data[chunk[0]] = None + + return parsed_data + + @staticmethod + def _escape_str(value): + """ + Escape a value into a TS3 compatible string + + @param value: Value + @type value: string/int + + """ + + if isinstance(value, int): + return str(value) + + for i, j in ts3_escape: + value = value.replace(i, j) + + return value + + @staticmethod + def _unescape_str(value): + """ + Unescape a TS3 compatible string into a normal string + + @param value: Value + @type value: string/int + + """ + + if isinstance(value, int): + return str(value) + + for i, j in ts3_escape: + value = value.replace(j, i) + + return value diff --git a/ts3/server.py b/ts3/server.py new file mode 100644 index 0000000..2620c82 --- /dev/null +++ b/ts3/server.py @@ -0,0 +1,151 @@ +# Python TS3 Library (python-ts3) +# +# Copyright (c) 2011, Andrew Williams +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# * Neither the name of the nor the +# names of its contributors may be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import logging +from protocol import TS3Proto, InvalidArguments +from defines import * + + +class TS3Server(TS3Proto): + def __init__(self, ip=None, port=10011, id=0): + """ + Abstraction class for TS3 Servers + + @param ip: IP Address + @type ip: str + @param port: Port Number + @type port: int + + """ + self._logger = logging.getLogger(__name__) + if ip and port: + if self.connect(ip, port) and id > 0: + self.use(id) + + @property + def logger(self): + return self._logger + + def login(self, username, password): + """ + Login to the TS3 Server + + @param username: Username + @type username: str + @param password: Password + @type password: str + """ + + response = self.send_command('login', keys={'client_login_name': username, 'client_login_password': password}) + return response.is_successful + + def serverlist(self): + """ + Get a list of all Virtual Servers on the connected TS3 instance + """ + return self.send_command('serverlist') + + def gm(self, msg): + """ + Send a global message to the current Virtual Server + + @param msg: Message + @type msg: str + """ + response = self.send_command('gm', keys={'msg': msg}) + return response.is_successful + + def use(self, id): + """ + Use a particular Virtual Server instance + + @param id: Virtual Server ID + @type id: int + """ + response = self.send_command('use', keys={'sid': id}) + return response.is_successful + + def clientlist(self): + """ + Returns a clientlist of the current connected server/vhost + """ + + response = self.send_command('clientlist') + + if response.is_successful: + clientlist = {} + for client in response.data: + clientlist[client['clid']] = client + return clientlist + else: + # TODO: Raise a exception? + self.logger.debug("clientlist - error retrieving client list") + return {} + + def clientkick(self, clid=None, cldbid=None, type=REASON_KICK_SERVER, message=None): + """ + Kicks a user identified by either clid or cldbid + """ + + client = None + if cldbid: + clientlist = self.send_command('clientlist') + for cl in clientlist.values(): + if int(cl['client_database_id']) == cldbid: + client = cl['clid'] + self.logger.debug("clientkick - identified user from clid (%s = %s)" % (cldbid, client)) + break + + if not client: + # TODO: we should throw an exception here actually + self.logger.debug("clientkick - no client with specified cldbid (%s) was found" % cldbid) + return False + elif clid: + client = clid + else: + raise InvalidArguments('No clid or cldbid provided') + + if not message: + message = '' + else: + # Kick message can only be 40 characters + message = message[:40] + + if client: + self.logger.debug("clientkick - Kicking clid %s" % client) + response = self.send_command('clientkick', keys={'clid': client, 'reasonid': type, 'reasonmsg': message}) + return response.is_successful + + return False + + def clientpoke(self, clid, message): + """ + Poke a client with the specified message + """ + + response = self.send_command('clientpoke', keys={'clid': clid, 'msg': message}) + return response.is_successful diff --git a/ts3/test.py b/ts3/test.py index f54b7b8..343e25d 100644 --- a/ts3/test.py +++ b/ts3/test.py @@ -1,5 +1,5 @@ import unittest -from __init__ import TS3Proto +from protocol import TS3Proto class TS3ProtoTest(unittest.TestCase): """ Tests the TS3Proto class """