Source code for logdispatchr.inputs

# -*- coding:utf-8 -*-

import queue
import logging
import msgpack
import threading
import socketserver


from logdispatchr import formatters
from logdispatchr.models import Message

logger = logging.getLogger(__name__)


[docs]class BaseInput(object): """ This class takes stuff from a source, and converts it in the standard internal message format. It is also the base class for any input. :param formatter: A string identifying the formatter to use :param key: the identifier for the messages comming from this source :param max_waiting_messages: the size of the internal queue. :type formatter: str :type key: str :type max_waiting_messages: queue.Queue """ def __init__(self, **kwargs): self.formatter = formatters.get_formatter( kwargs.get('formatter', None)) self.key = kwargs.get('key', 'undefined') self.messages = queue.Queue(kwargs.get('max_waiting_messages', 10))
[docs] def setup(self): """ Any operation needed before actually recieving messages. Should be overriden in child classes if needed. """ pass
# rewrite this as iterators?
[docs] def has_available_message(self): """ :rtype: boolean :return: wether we have messages waiting or not. """ return self.messages.qsize() > 0
[docs] def get(self): """ :return: the next message in the queue :rtype: Message .. seealso:: :doc:`models` """ return self.messages.get()
[docs]class UDPSyslogInput(BaseInput): """ A naïve UDP rsyslog reciever. Doesn't parse the recieved message. :param host: the host to bind to. The most common is 0.0.0.0, to listen to all interfaces, but localhost or 127.0.0.1 are a possibility :type host: str :param port: the port we should listen to :type port: int """ class UDPSyslogServer(socketserver.UDPServer): def __init__(self, host, port, message_queue, key): self.message_queue = message_queue self.key = key super().__init__((host, port), UDPSyslogInput.UDPSyslogHandler) class UDPSyslogHandler(socketserver.DatagramRequestHandler): def handle(self): data = bytes.decode(self.request[0].strip()) # socket = self.request[1] m = Message() m['message'] = str(data) m['key'] = self.server.key logger.debug('got UDP syslog message: %s from %s', m, self.client_address[0]) self.server.message_queue.put(m) def __init__(self, **kwargs): self.host = kwargs.get('host', 'localhost') self.port = kwargs.get('port', 514) super().__init__(**kwargs) self.setup() def setup(self): self.server = UDPSyslogInput.UDPSyslogServer(self.host, self.port, self.messages, self.key) self.serverthread = threading.Thread(target=self.server.serve_forever) self.serverthread.setDaemon(True) self.serverthread.start() logger.info("successfully started Rsyslog server on %s:%s", self.host, self.port)
[docs]class LogdispatchrUDPInput(BaseInput): """ Listens to forwarded messages from other logdispatchr instances. Please see the "models" page of the documentation to know more about the exchange format :param host: the host to bind to. The most common is 0.0.0.0, to listen to all interfaces, but localhost or 127.0.0.1 are a possibility :type host: str :param port: the port we should listen to :type port: int """ class UDPMessagePackServer(socketserver.UDPServer): def __init__(self, host, port, message_queue): self.message_queue = message_queue self.key = None super().__init__((host, port), LogdispatchrUDPInput.UDPMessagePackHandler) class UDPMessagePackHandler(socketserver.DatagramRequestHandler): def handle(self): data = bytes.decode(self.request[0].strip()) # socket = self.request[1] m = Message(msgpack.unpackb(data)) # Don't update the key, since it is a forward. # but let's check for its presence if 'key' not in m: logger.warning('got forwarded message without a key: %s', m) logger.debug('got UDP Logdispatchr message: %s from %s', m, self.client_address[0]) self.server.message_queue.put(m) def __init__(self, **kwargs): self.host = kwargs.get('host', 'localhost') self.port = kwargs.get('port', 5140) super().__init__(**kwargs) self.setup() def setup(self): self.server = LogdispatchrUDPInput.UDPMessagePackServer( self.host, self.port, self.messages) self.serverthread = threading.Thread(target=self.server.serve_forever) self.serverthread.setDaemon(True) self.serverthread.start() logger.info("successfully started Logdispatchr \ message server on %s:%s", self.host, self.port)