# Python TS3 Library (python-ts3) # # Copyright (c) 2011, Andrew Williams # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging import telnetlib from threading import Lock class ConnectionError(Exception): def __init__(self, ip, port): self.ip = ip self.port = port def __str__(self): return 'Error connecting to host %s port %s.' % (self.ip, self.port) class NoConnectionError(Exception): def __str__(self): return 'No connection established.' class InvalidArguments(ValueError): """ Raised when a abstracted function has received invalid arguments """ ts3_escape = [ (chr(92), r'\\'), # \ (chr(47), r"\/"), # / (chr(32), r'\s'), # Space (chr(124), r'\p'), # | (chr(7), r'\a'), # Bell (chr(8), r'\b'), # Backspace (chr(12), r'\f'), # Form Feed (chr(10), r'\n'), # Newline (chr(13), r'\r'), # Carriage Return (chr(9), r'\t'), # Horizontal Tab (chr(11), r'\v'), # Vertical tab ] class TS3Response(): def __init__(self, response, data): self.response = TS3Proto.parse_response(response) self.data = TS3Proto.parse_data(data) if isinstance(self.data, dict): if self.data: self.data = [self.data] else: self.data = [] @property def is_successful(self): return self.response['msg'] == 'ok' class TS3Proto(): def __init__(self): self.io_lock = Lock() self._connected = False self._timeout = 0 self._telnet = None self._logger = logging.getLogger(__name__) @property def logger(self): return self._logger def connect(self, ip, port=10011, timeout=5): self.io_lock.acquire() try: self._telnet = telnetlib.Telnet(ip, port) except telnetlib.socket.error: raise ConnectionError(ip, port) self._timeout = timeout self._connected = False data = self._telnet.read_until("\n\r", self._timeout) self.io_lock.release() if data.endswith("TS3\n\r"): self._connected = True return self._connected def disconnect(self): self.check_connection() self.send_command("quit") self._telnet.close() self._connected = False def send_command(self, command, keys=None, opts=None): self.check_connection() commandstr = self.construct_command(command, keys=keys, opts=opts) self.logger.debug("send_command - %s" % commandstr) self.io_lock.acquire() self._telnet.write("%s\n\r" % commandstr) data = "" response = self._telnet.read_until("\n\r", self._timeout) self.io_lock.release() if not response.startswith("error"): # what we just got was extra data data = response response = self._telnet.read_until("\n\r", self._timeout) return TS3Response(response, data) def check_connection(self): if not self.is_connected: raise NoConnectionError def is_connected(self): return self._connected def construct_command(self, command, keys=None, opts=None): """ Constructs a TS3 formatted command string Keys can have a single nested list to construct a nested parameter @param command: Command list @type command: string @param keys: Key/Value pairs @type keys: dict @param opts: Options @type opts: list """ cstr = [command] # Add the keys and values, escape as needed if keys: for key in keys: if isinstance(keys[key], list): ncstr = [] for nest in keys[key]: ncstr.append("%s=%s" % (key, self._escape_str(nest))) cstr.append("|".join(ncstr)) else: cstr.append("%s=%s" % (key, self._escape_str(keys[key]))) # Add in options if opts: for opt in opts: cstr.append("-%s" % opt) return " ".join(cstr) @staticmethod def parse_response(response): """ Parses a TS3 command string into command/keys/opts tuple @param response: Command string @type response: string """ # responses always begins with "error " so we may just skip it return TS3Proto.parse_data(response[6:]) @staticmethod def parse_data(data): """ Parses data string consisting of key=value @param data: data string @type data: string """ data = data.strip() multipart = data.split('|') if len(multipart) > 1: values = [] for part in multipart: values.append(TS3Proto.parse_data(part)) return values chunks = data.split(' ') parsed_data = {} for chunk in chunks: chunk = chunk.strip().split('=') if len(chunk) > 1: if len(chunk) > 2: # value can contain '=' which may confuse our parser chunk = [chunk[0], '='.join(chunk[1:])] key, value = chunk parsed_data[key] = TS3Proto._unescape_str(value) else: # TS3 Query Server may sometimes return a key without any value # and we default its value to None parsed_data[chunk[0]] = None return parsed_data @staticmethod def _escape_str(value): """ Escape a value into a TS3 compatible string @param value: Value @type value: string/int """ if isinstance(value, int): return str(value) for i, j in ts3_escape: value = value.replace(i, j) return value @staticmethod def _unescape_str(value): """ Unescape a TS3 compatible string into a normal string @param value: Value @type value: string/int """ if isinstance(value, int): return str(value) for i, j in ts3_escape: value = value.replace(j, i) return value