# This is a simple server that listens on both TCP and UDP ports and logs incoming messages to a file & console. # To use this with LogLib, simply start the server and configure the LogLib client to send logs to the server. # The following environment variables can be used to configure LogLib to send logs to the server: # # - LOGLIB_UDP_ENABLED=true (enable UDP logging) # - LOGLIB_UDP_HOST=0.0.0.0 (UDP host) # - LOGLIB_UDP_PORT=5131 (UDP port, default 5131) # # This server is designed to only accept JSON-formatted log messages using the UnixTimestamp for the Timestamps, # TraceLevels can be anything. # # After configuring the environment variables, start the server & run your program. Logging events should be # sent to the server and logged to the console and a file in the specified working directory. import argparse import json import logging import os import socket import threading from datetime import datetime from queue import Queue, Empty from enum import Enum from typing import Optional, Dict, Any, List from dataclasses import dataclass import colorama from colorama import Fore, Back, Style colorama.init() class LogLevel(str, Enum): DEBUG = "DBG" VERBOSE = "VRB" INFO = "INFO" WARNING = "WRN" ERROR = "ERR" CRITICAL = "CRT" @classmethod def to_python_level(cls, level: str) -> int: return { cls.DEBUG: logging.DEBUG, cls.VERBOSE: logging.DEBUG, cls.INFO: logging.INFO, cls.WARNING: logging.WARNING, cls.ERROR: logging.ERROR, cls.CRITICAL: logging.CRITICAL }.get(level, logging.INFO) @classmethod def get_color(cls, level: str) -> str: return { cls.DEBUG: Fore.CYAN, cls.VERBOSE: Fore.BLUE, cls.INFO: Fore.GREEN, cls.WARNING: Fore.YELLOW, cls.ERROR: Fore.RED, cls.CRITICAL: Fore.RED + Back.WHITE }.get(level, Fore.WHITE) @dataclass class StackFrame: file: Optional[str] line: Optional[int] function: Optional[str] args: Optional[List[Any]] class_name: Optional[str] call_type: str = 'static' @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'StackFrame': return cls( file=str(data.get('file')) if data.get('file') else None, line=int(data['line']) if data.get('line') is not None else None, function=str(data.get('function')) if data.get('function') else None, args=data.get('args'), class_name=str(data.get('class')) if data.get('class') else None, call_type=str(data.get('callType', 'static')) ) def format(self) -> str: location = f"{self.file or '?'}:{self.line or '?'}" if self.class_name: call = f"{self.class_name}{self.call_type}{self.function or ''}" else: call = self.function or '' args_str = "" if self.args: args_str = f"({', '.join(str(arg) for arg in self.args)})" return f"{Fore.BLUE}{call}{Style.RESET_ALL}{args_str} in {Fore.CYAN}{location}{Style.RESET_ALL}" class ExceptionDetails: def __init__(self, name: str, message: str, code: Optional[int], file: Optional[str], line: Optional[int], trace: List[StackFrame], previous: Optional['ExceptionDetails']): self.name = name self.message = message self.code = code self.file = file self.line = line self.trace = trace self.previous = previous @classmethod def from_dict(cls, data: Dict[str, Any]) -> Optional['ExceptionDetails']: if not data: return None trace = [] if 'trace' in data and isinstance(data['trace'], list): trace = [StackFrame.from_dict(frame) for frame in data['trace'] if isinstance(frame, dict)] previous = None if 'previous' in data and isinstance(data['previous'], dict): previous = cls.from_dict(data['previous']) return cls( name=str(data.get('name', '')), message=str(data.get('message', '')), code=int(data['code']) if data.get('code') is not None else None, file=str(data.get('file')) if data.get('file') else None, line=int(data['line']) if data.get('line') is not None else None, trace=trace, previous=previous ) def format(self, level: int = 0) -> str: indent = " " * level parts = [] # Exception header header = f"{indent}{Fore.RED}{self.name}" if self.code is not None and 0: header += f" {Fore.YELLOW}:{self.code}{Style.RESET_ALL}" # Message header += f"{Fore.WHITE}:{Style.RESET_ALL} {self.message}" # Location if self.file and self.line: header += f"{Fore.WHITE} at {Style.RESET_ALL}{self.file}:{self.line}" parts.append(header) # Stack trace if self.trace: parts.append(f"{indent}{Fore.WHITE}Stack trace:{Style.RESET_ALL}") for frame in self.trace: parts.append(f"{indent} → {frame.format()}") # Previous exception if self.previous: parts.append(f"{indent}{Fore.YELLOW}Caused by:{Style.RESET_ALL}") parts.append(self.previous.format(level + 1)) return "\n".join(parts) class ColoredLogger(logging.Logger): def __init__(self, name: str): super().__init__(name) self.formatter = logging.Formatter( f'%(asctime)s {Fore.WHITE}[%(levelname)s]{Style.RESET_ALL} %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler = logging.StreamHandler() console_handler.setFormatter(self.formatter) self.addHandler(console_handler) class MultiProtocolServer: def __init__(self, host: str, port: int, working_directory: str): self.host = host self.port = port self.working_directory = working_directory self.log_queue: Queue = Queue() self.current_date = datetime.now().strftime('%Y-%m-%d') self.log_file = None self.stop_event = threading.Event() os.makedirs(self.working_directory, exist_ok=True) # Set up colored logging logging.setLoggerClass(ColoredLogger) self.logger = logging.getLogger("MultiProtocolServer") self.logger.setLevel(logging.DEBUG) def _handle_log_event(self, data: Dict[str, Any], address: tuple) -> None: """Process and format a structured log event with colors and proper formatting.""" try: app_name = data.get('application_name', 'Unknown') timestamp = data.get('timestamp') if timestamp: try: timestamp = datetime.fromtimestamp(int(timestamp)) timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S') except (ValueError, TypeError): timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') else: timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S') level = data.get('level', 'INFO') message = data.get('message', '') # Format the log message with colors color = LogLevel.get_color(level) log_message = f"{color}[{app_name}]{Style.RESET_ALL} {message}" # Handle exception if present exception_data = data.get('exception') if exception_data: exception = ExceptionDetails.from_dict(exception_data) if exception: log_message += f"\n{exception.format()}" # Log with appropriate level python_level = LogLevel.to_python_level(level) self.logger.log(python_level, log_message) # Add to log queue for file logging self.log_queue.put({ "timestamp": timestamp, "address": address, "data": data }) except Exception as e: self.logger.error(f"Error processing log event: {e}", exc_info=True) def _handle_data(self, data: bytes, address: tuple) -> None: """Process incoming data and attempt to parse as JSON.""" try: decoded_data = data.decode('utf-8').strip() try: json_data = json.loads(decoded_data) # Handle structured log event self._handle_log_event(json_data, address) except json.JSONDecodeError: # Log raw data if not valid JSON self.logger.info(f"Received non-JSON data from {address}: {decoded_data}") self.log_queue.put({ "timestamp": datetime.now().isoformat(), "address": address, "data": decoded_data }) except Exception as e: self.logger.error(f"Data handling error: {e}") # Rest of the class remains the same... def _get_log_file(self): date = datetime.now().strftime('%Y-%m-%d') if date != self.current_date or self.log_file is None: if self.log_file: self.log_file.close() self.current_date = date filename = os.path.join(self.working_directory, f"log{date}.jsonl") self.log_file = open(filename, 'a') return self.log_file def _log_writer(self): while not self.stop_event.is_set() or not self.log_queue.empty(): try: data = self.log_queue.get(timeout=1) log_file = self._get_log_file() json.dump(data, log_file) log_file.write('\n') log_file.flush() except Empty: continue except Exception as e: self.logger.error(f"Error writing to log file: {e}") def _handle_tcp_client(self, client_socket, address): self.logger.info(f"TCP connection established from {address}") try: with client_socket: while True: data = client_socket.recv(4096) if not data: break self._handle_data(data, address) except Exception as e: self.logger.error(f"TCP client error: {e}") self.logger.info(f"TCP connection closed from {address}") def _handle_udp_client(self, data, address): try: self._handle_data(data, address) except Exception as e: self.logger.error(f"UDP client error: {e}") def _start_tcp_server(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_socket: tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) tcp_socket.bind((self.host, self.port)) tcp_socket.listen(5) self.logger.debug(f"TCP server running on {self.host}:{self.port}") while not self.stop_event.is_set(): try: client_socket, address = tcp_socket.accept() threading.Thread(target=self._handle_tcp_client, args=(client_socket, address), daemon=True).start() except Exception as e: self.logger.error(f"TCP server error: {e}") def _start_udp_server(self): with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: udp_socket.bind((self.host, self.port)) self.logger.debug(f"UDP server running on {self.host}:{self.port}") while not self.stop_event.is_set(): try: data, address = udp_socket.recvfrom(4096) self._handle_udp_client(data, address) except Exception as e: self.logger.error(f"UDP server error: {e}") def start(self): self.logger.info("Starting MultiProtocolServer...") threading.Thread(target=self._log_writer, daemon=True).start() tcp_thread = threading.Thread(target=self._start_tcp_server, daemon=True) udp_thread = threading.Thread(target=self._start_udp_server, daemon=True) tcp_thread.start() udp_thread.start() try: tcp_thread.join() udp_thread.join() except KeyboardInterrupt: self.stop() def stop(self): self.logger.info("Stopping MultiProtocolServer...") self.stop_event.set() if self.log_file: self.log_file.close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="MultiProtocol Server") parser.add_argument("-p", "--port", type=int, default=5131, help="Port to listen on") parser.add_argument("-w", "--working-directory", type=str, default="./logs", help="Directory to store log files") args = parser.parse_args() server = MultiProtocolServer("0.0.0.0", args.port, args.working_directory) server.start()