Added logger.py for Docker

This commit is contained in:
netkas 2025-03-14 15:25:34 -04:00
parent 167a010332
commit 6fee098407
Signed by: netkas
GPG key ID: 4D8629441B76E4CC

345
docker/logger.py Normal file
View file

@ -0,0 +1,345 @@
import argparse
import json
import logging
import os
import socket
import threading
from datetime import datetime
from queue import Queue, Empty
from enum import Enum
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import colorama
from colorama import Fore, Back, Style
# Initialize colorama for cross-platform color support
colorama.init()
class LogLevel(str, Enum):
DEBUG = "DBG"
VERBOSE = "VRB"
INFO = "INFO"
WARNING = "WRN"
ERROR = "ERR"
CRITICAL = "CRT"
@classmethod
def to_python_level(cls, level: str) -> int:
return {
cls.DEBUG: logging.DEBUG,
cls.VERBOSE: logging.DEBUG,
cls.INFO: logging.INFO,
cls.WARNING: logging.WARNING,
cls.ERROR: logging.ERROR,
cls.CRITICAL: logging.CRITICAL
}.get(level, logging.INFO)
@classmethod
def get_color(cls, level: str) -> str:
return {
cls.DEBUG: Fore.CYAN,
cls.VERBOSE: Fore.BLUE,
cls.INFO: Fore.GREEN,
cls.WARNING: Fore.YELLOW,
cls.ERROR: Fore.RED,
cls.CRITICAL: Fore.RED + Back.WHITE
}.get(level, Fore.WHITE)
@dataclass
class StackFrame:
file: Optional[str]
line: Optional[int]
function: Optional[str]
args: Optional[List[Any]]
class_name: Optional[str]
call_type: str = 'static'
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'StackFrame':
return cls(
file=str(data.get('file')) if data.get('file') else None,
line=int(data['line']) if data.get('line') is not None else None,
function=str(data.get('function')) if data.get('function') else None,
args=data.get('args'),
class_name=str(data.get('class')) if data.get('class') else None,
call_type=str(data.get('callType', 'static'))
)
def format(self) -> str:
location = f"{self.file or '?'}:{self.line or '?'}"
if self.class_name:
call = f"{self.class_name}{self.call_type}{self.function or ''}"
else:
call = self.function or ''
args_str = ""
if self.args:
args_str = f"({', '.join(str(arg) for arg in self.args)})"
return f"{Fore.BLUE}{call}{Style.RESET_ALL}{args_str} in {Fore.CYAN}{location}{Style.RESET_ALL}"
class ExceptionDetails:
def __init__(self, name: str, message: str, code: Optional[int],
file: Optional[str], line: Optional[int],
trace: List[StackFrame], previous: Optional['ExceptionDetails']):
self.name = name
self.message = message
self.code = code
self.file = file
self.line = line
self.trace = trace
self.previous = previous
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> Optional['ExceptionDetails']:
if not data:
return None
trace = []
if 'trace' in data and isinstance(data['trace'], list):
trace = [StackFrame.from_dict(frame) for frame in data['trace']
if isinstance(frame, dict)]
previous = None
if 'previous' in data and isinstance(data['previous'], dict):
previous = cls.from_dict(data['previous'])
return cls(
name=str(data.get('name', '')),
message=str(data.get('message', '')),
code=int(data['code']) if data.get('code') is not None else None,
file=str(data.get('file')) if data.get('file') else None,
line=int(data['line']) if data.get('line') is not None else None,
trace=trace,
previous=previous
)
def format(self, level: int = 0) -> str:
indent = " " * level
parts = []
# Exception header
header = f"{indent}{Fore.RED}{self.name}"
if self.code is not None and 0:
header += f" {Fore.YELLOW}:{self.code}{Style.RESET_ALL}"
# Message
header += f"{Fore.WHITE}:{Style.RESET_ALL} {self.message}"
# Location
if self.file and self.line:
header += f"{Fore.WHITE} at {Style.RESET_ALL}{self.file}:{self.line}"
parts.append(header)
# Stack trace
if self.trace:
parts.append(f"{indent}{Fore.WHITE}Stack trace:{Style.RESET_ALL}")
for frame in self.trace:
parts.append(f"{indent}{frame.format()}")
# Previous exception
if self.previous:
parts.append(f"{indent}{Fore.YELLOW}Caused by:{Style.RESET_ALL}")
parts.append(self.previous.format(level + 1))
return "\n".join(parts)
class ColoredLogger(logging.Logger):
def __init__(self, name: str):
super().__init__(name)
self.formatter = logging.Formatter(
f'%(asctime)s {Fore.WHITE}[%(levelname)s]{Style.RESET_ALL} %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setFormatter(self.formatter)
self.addHandler(console_handler)
class MultiProtocolServer:
def __init__(self, host: str, port: int, working_directory: str):
self.host = host
self.port = port
self.working_directory = working_directory
self.log_queue: Queue = Queue()
self.current_date = datetime.now().strftime('%Y-%m-%d')
self.log_file = None
self.stop_event = threading.Event()
os.makedirs(self.working_directory, exist_ok=True)
# Set up colored logging
logging.setLoggerClass(ColoredLogger)
self.logger = logging.getLogger("MultiProtocolServer")
self.logger.setLevel(logging.DEBUG)
def _handle_log_event(self, data: Dict[str, Any], address: tuple) -> None:
"""Process and format a structured log event with colors and proper formatting."""
try:
app_name = data.get('application_name', 'Unknown')
timestamp = data.get('timestamp')
if timestamp:
try:
timestamp = datetime.fromtimestamp(int(timestamp))
timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S')
except (ValueError, TypeError):
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
else:
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
level = data.get('level', 'INFO')
message = data.get('message', '')
# Format the log message with colors
color = LogLevel.get_color(level)
log_message = f"{color}[{app_name}]{Style.RESET_ALL} {message}"
# Handle exception if present
exception_data = data.get('exception')
if exception_data:
exception = ExceptionDetails.from_dict(exception_data)
if exception:
log_message += f"\n{exception.format()}"
# Log with appropriate level
python_level = LogLevel.to_python_level(level)
self.logger.log(python_level, log_message)
# Add to log queue for file logging
self.log_queue.put({
"timestamp": timestamp,
"address": address,
"data": data
})
except Exception as e:
self.logger.error(f"Error processing log event: {e}", exc_info=True)
def _handle_data(self, data: bytes, address: tuple) -> None:
"""Process incoming data and attempt to parse as JSON."""
try:
decoded_data = data.decode('utf-8').strip()
try:
json_data = json.loads(decoded_data)
# Handle structured log event
self._handle_log_event(json_data, address)
except json.JSONDecodeError:
# Log raw data if not valid JSON
self.logger.info(f"Received non-JSON data from {address}: {decoded_data}")
self.log_queue.put({
"timestamp": datetime.now().isoformat(),
"address": address,
"data": decoded_data
})
except Exception as e:
self.logger.error(f"Data handling error: {e}")
# Rest of the class remains the same...
def _get_log_file(self):
date = datetime.now().strftime('%Y-%m-%d')
if date != self.current_date or self.log_file is None:
if self.log_file:
self.log_file.close()
self.current_date = date
filename = os.path.join(self.working_directory, f"log{date}.jsonl")
self.log_file = open(filename, 'a')
return self.log_file
def _log_writer(self):
while not self.stop_event.is_set() or not self.log_queue.empty():
try:
data = self.log_queue.get(timeout=1)
log_file = self._get_log_file()
json.dump(data, log_file)
log_file.write('\n')
log_file.flush()
except Empty:
continue
except Exception as e:
self.logger.error(f"Error writing to log file: {e}")
def _handle_tcp_client(self, client_socket, address):
self.logger.info(f"TCP connection established from {address}")
try:
with client_socket:
while True:
data = client_socket.recv(4096)
if not data:
break
self._handle_data(data, address)
except Exception as e:
self.logger.error(f"TCP client error: {e}")
self.logger.info(f"TCP connection closed from {address}")
def _handle_udp_client(self, data, address):
try:
self._handle_data(data, address)
except Exception as e:
self.logger.error(f"UDP client error: {e}")
def _start_tcp_server(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as tcp_socket:
tcp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_socket.bind((self.host, self.port))
tcp_socket.listen(5)
self.logger.debug(f"TCP server running on {self.host}:{self.port}")
while not self.stop_event.is_set():
try:
client_socket, address = tcp_socket.accept()
threading.Thread(target=self._handle_tcp_client,
args=(client_socket, address),
daemon=True).start()
except Exception as e:
self.logger.error(f"TCP server error: {e}")
def _start_udp_server(self):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket:
udp_socket.bind((self.host, self.port))
self.logger.debug(f"UDP server running on {self.host}:{self.port}")
while not self.stop_event.is_set():
try:
data, address = udp_socket.recvfrom(4096)
self._handle_udp_client(data, address)
except Exception as e:
self.logger.error(f"UDP server error: {e}")
def start(self):
self.logger.info("Starting MultiProtocolServer...")
threading.Thread(target=self._log_writer, daemon=True).start()
tcp_thread = threading.Thread(target=self._start_tcp_server, daemon=True)
udp_thread = threading.Thread(target=self._start_udp_server, daemon=True)
tcp_thread.start()
udp_thread.start()
try:
tcp_thread.join()
udp_thread.join()
except KeyboardInterrupt:
self.stop()
def stop(self):
self.logger.info("Stopping Logging Server...")
self.stop_event.set()
if self.log_file:
self.log_file.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Logging Server")
parser.add_argument("-p", "--port", type=int, default=8080,
help="Port to listen on")
parser.add_argument("-w", "--working-directory", type=str,
default="./logs", help="Directory to store log files")
args = parser.parse_args()
server = MultiProtocolServer("0.0.0.0", args.port, args.working_directory)
server.start()