loglib2/server.py
2025-01-22 01:02:17 -05:00

358 lines
No EOL
13 KiB
Python

# This is a simple server that listens on both TCP and UDP ports and logs incoming messages to a file & console.
# To use this with LogLib, simply start the server and configure the LogLib client to send logs to the server.
# The following environment variables can be used to configure LogLib to send logs to the server:
#
# - LOGLIB_UDP_ENABLED=true (enable UDP logging)
# - LOGLIB_UDP_HOST=0.0.0.0 (UDP host)
# - LOGLIB_UDP_PORT=5131 (UDP port, default 5131)
#
# This server is designed to only accept JSON-formatted log messages using the UnixTimestamp for the Timestamps,
# TraceLevels can be anything.
#
# After configuring the environment variables, start the server & run your program. Logging events should be
# sent to the server and logged to the console and a file in the specified working directory.
import argparse
import json
import logging
import os
import socket
import threading
from datetime import datetime
from queue import Queue, Empty
from enum import Enum
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import colorama
from colorama import Fore, Back, Style
colorama.init()
class LogLevel(str, Enum):
DEBUG = "DBG"
VERBOSE = "VRB"
INFO = "INFO"
WARNING = "WRN"
ERROR = "ERR"
CRITICAL = "CRT"
@classmethod
def to_python_level(cls, level: str) -> int:
return {
cls.DEBUG: logging.DEBUG,
cls.VERBOSE: logging.DEBUG,
cls.INFO: logging.INFO,
cls.WARNING: logging.WARNING,
cls.ERROR: logging.ERROR,
cls.CRITICAL: logging.CRITICAL
}.get(level, logging.INFO)
@classmethod
def get_color(cls, level: str) -> str:
return {
cls.DEBUG: Fore.CYAN,
cls.VERBOSE: Fore.BLUE,
cls.INFO: Fore.GREEN,
cls.WARNING: Fore.YELLOW,
cls.ERROR: Fore.RED,
cls.CRITICAL: Fore.RED + Back.WHITE
}.get(level, Fore.WHITE)
@dataclass
class StackFrame:
file: Optional[str]
line: Optional[int]
function: Optional[str]
args: Optional[List[Any]]
class_name: Optional[str]
call_type: str = 'static'
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'StackFrame':
return cls(
file=str(data.get('file')) if data.get('file') else None,
line=int(data['line']) if data.get('line') is not None else None,
function=str(data.get('function')) if data.get('function') else None,
args=data.get('args'),
class_name=str(data.get('class')) if data.get('class') else None,
call_type=str(data.get('callType', 'static'))
)
def format(self) -> str:
location = f"{self.file or '?'}:{self.line or '?'}"
if self.class_name:
call = f"{self.class_name}{self.call_type}{self.function or ''}"
else:
call = self.function or ''
args_str = ""
if self.args:
args_str = f"({', '.join(str(arg) for arg in self.args)})"
return f"{Fore.BLUE}{call}{Style.RESET_ALL}{args_str} in {Fore.CYAN}{location}{Style.RESET_ALL}"
class ExceptionDetails:
def __init__(self, name: str, message: str, code: Optional[int],
file: Optional[str], line: Optional[int],
trace: List[StackFrame], previous: Optional['ExceptionDetails']):
self.name = name
self.message = message
self.code = code
self.file = file
self.line = line
self.trace = trace
self.previous = previous
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Optional['ExceptionDetails']:
if not data:
return None
trace = []
if 'trace' in data and isinstance(data['trace'], list):
trace = [StackFrame.from_dict(frame) for frame in data['trace']
if isinstance(frame, dict)]
previous = None
if 'previous' in data and isinstance(data['previous'], dict):
previous = cls.from_dict(data['previous'])
return cls(
name=str(data.get('name', '')),
message=str(data.get('message', '')),
code=int(data['code']) if data.get('code') is not None else None,
file=str(data.get('file')) if data.get('file') else None,
line=int(data['line']) if data.get('line') is not None else None,
trace=trace,
previous=previous
)
def format(self, level: int = 0) -> str:
indent = " " * level
parts = []
# Exception header
header = f"{indent}{Fore.RED}{self.name}"
if self.code is not None and 0:
header += f" {Fore.YELLOW}:{self.code}{Style.RESET_ALL}"
# Message
header += f"{Fore.WHITE}:{Style.RESET_ALL} {self.message}"
# Location
if self.file and self.line:
header += f"{Fore.WHITE} at {Style.RESET_ALL}{self.file}:{self.line}"
parts.append(header)
# Stack trace
if self.trace:
parts.append(f"{indent}{Fore.WHITE}Stack trace:{Style.RESET_ALL}")
for frame in self.trace:
parts.append(f"{indent}{frame.format()}")
# Previous exception
if self.previous:
parts.append(f"{indent}{Fore.YELLOW}Caused by:{Style.RESET_ALL}")
parts.append(self.previous.format(level + 1))
return "\n".join(parts)
class ColoredLogger(logging.Logger):
def __init__(self, name: str):
super().__init__(name)
self.formatter = logging.Formatter(
f'%(asctime)s {Fore.WHITE}[%(levelname)s]{Style.RESET_ALL} %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(self.formatter)
self.addHandler(console_handler)
class MultiProtocolServer:
def __init__(self, host: str, port: int, working_directory: str):
self.host = host
self.port = port
self.working_directory = working_directory
self.log_queue: Queue = Queue()
self.current_date = datetime.now().strftime('%Y-%m-%d')
self.log_file = None
self.stop_event = threading.Event()
os.makedirs(self.working_directory, exist_ok=True)
# Set up colored logging
logging.setLoggerClass(ColoredLogger)
self.logger = logging.getLogger("MultiProtocolServer")
self.logger.setLevel(logging.DEBUG)
def _handle_log_event(self, data: Dict[str, Any], address: tuple) -> None:
"""Process and format a structured log event with colors and proper formatting."""
try:
app_name = data.get('application_name', 'Unknown')
timestamp = data.get('timestamp')
if timestamp:
try:
timestamp = datetime.fromtimestamp(int(timestamp))
timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
else:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
level = data.get('level', 'INFO')
message = data.get('message', '')
# Format the log message with colors
color = LogLevel.get_color(level)
log_message = f"{color}[{app_name}]{Style.RESET_ALL} {message}"
# Handle exception if present
exception_data = data.get('exception')
if exception_data:
exception = ExceptionDetails.from_dict(exception_data)
if exception:
log_message += f"\n{exception.format()}"
# Log with appropriate level
python_level = LogLevel.to_python_level(level)
self.logger.log(python_level, log_message)
# Add to log queue for file logging
self.log_queue.put({
"timestamp": timestamp,
"address": address,
"data": data
})
except Exception as e:
self.logger.error(f"Error processing log event: {e}", exc_info=True)
def _handle_data(self, data: bytes, address: tuple) -> None:
"""Process incoming data and attempt to parse as JSON."""
try:
decoded_data = data.decode('utf-8').strip()
try:
json_data = json.loads(decoded_data)
# Handle structured log event
self._handle_log_event(json_data, address)
except json.JSONDecodeError:
# Log raw data if not valid JSON
self.logger.info(f"Received non-JSON data from {address}: {decoded_data}")
self.log_queue.put({
"timestamp": datetime.now().isoformat(),
"address": address,
"data": decoded_data
})
except Exception as e:
self.logger.error(f"Data handling error: {e}")
# Rest of the class remains the same...
def _get_log_file(self):
date = datetime.now().strftime('%Y-%m-%d')
if date != self.current_date or self.log_file is None:
if self.log_file:
self.log_file.close()
self.current_date = date
filename = os.path.join(self.working_directory, f"log{date}.jsonl")
self.log_file = open(filename, 'a')
return self.log_file
def _log_writer(self):
while not self.stop_event.is_set() or not self.log_queue.empty():
try:
data = self.log_queue.get(timeout=1)
log_file = self._get_log_file()
json.dump(data, log_file)
log_file.write('\n')
log_file.flush()
except Empty:
continue
except Exception as e:
self.logger.error(f"Error writing to log file: {e}")
def _handle_tcp_client(self, client_socket, address):
self.logger.info(f"TCP connection established from {address}")
try:
with client_socket:
while True:
data = client_socket.recv(4096)
if not data:
break
self._handle_data(data, address)
except Exception as e:
self.logger.error(f"TCP client error: {e}")
self.logger.info(f"TCP connection closed from {address}")
def _handle_udp_client(self, data, address):
try:
self._handle_data(data, address)
except Exception as e:
self.logger.error(f"UDP client error: {e}")
def _start_tcp_server(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_socket:
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_socket.bind((self.host, self.port))
tcp_socket.listen(5)
self.logger.debug(f"TCP server running on {self.host}:{self.port}")
while not self.stop_event.is_set():
try:
client_socket, address = tcp_socket.accept()
threading.Thread(target=self._handle_tcp_client,
args=(client_socket, address),
daemon=True).start()
except Exception as e:
self.logger.error(f"TCP server error: {e}")
def _start_udp_server(self):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
udp_socket.bind((self.host, self.port))
self.logger.debug(f"UDP server running on {self.host}:{self.port}")
while not self.stop_event.is_set():
try:
data, address = udp_socket.recvfrom(4096)
self._handle_udp_client(data, address)
except Exception as e:
self.logger.error(f"UDP server error: {e}")
def start(self):
self.logger.info("Starting MultiProtocolServer...")
threading.Thread(target=self._log_writer, daemon=True).start()
tcp_thread = threading.Thread(target=self._start_tcp_server, daemon=True)
udp_thread = threading.Thread(target=self._start_udp_server, daemon=True)
tcp_thread.start()
udp_thread.start()
try:
tcp_thread.join()
udp_thread.join()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.logger.info("Stopping MultiProtocolServer...")
self.stop_event.set()
if self.log_file:
self.log_file.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="MultiProtocol Server")
parser.add_argument("-p", "--port", type=int, default=5131,
help="Port to listen on")
parser.add_argument("-w", "--working-directory", type=str,
default="./logs", help="Directory to store log files")
args = parser.parse_args()
server = MultiProtocolServer("0.0.0.0", args.port, args.working_directory)
server.start()