Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
"""RFC 5424 Syslog message serializer.

Copyright (c) 2025 MultiFactor
License: https://github.com/MultiDirectoryLab/MultiDirectory/blob/main/LICENSE
"""

import socket
from datetime import datetime, timezone
from typing import Any

from ldap_protocol.policies.audit.events.dataclasses import (
NormalizedAuditEvent,
)


class RFC5424Serializer:
Comment thread
Naksen marked this conversation as resolved.
"""Serializer for RFC 5424 compliant syslog messages."""

NILVALUE: str = "-"
UTF8_BOM: str = "\ufeff"

Comment thread
Misha-Shvets marked this conversation as resolved.
# SD-ID suffix for STRUCTURED-DATA: audit@32473
# Change to your registered Private Enterprise Number (PEN)
STRUCTURED_DATA_ID_SUFFIX: str = "32473"

SYSLOG_FACILITIES: dict[str, int] = {
Comment thread
milov-dmitriy marked this conversation as resolved.
"kernel": 0,
"user": 1,
"mail": 2,
"system": 3,
"security": 4,
"syslog": 5,
"printer": 6,
"network": 7,
"uucp": 8,
"clock": 9,
"authpriv": 10,
"ftp": 11,
"ntp": 12,
"audit": 13,
"alert": 14,
"cron": 15,
"local0": 16,
"local1": 17,
"local2": 18,
"local3": 19,
"local4": 20,
"local5": 21,
"local6": 22,
"local7": 23,
}

def __init__(
self,
app_name: str,
facility: str,
) -> None:
"""Initialize RFC 5424 serializer."""
self.app_name = app_name
self.facility = facility

def serialize(
self,
event: NormalizedAuditEvent,
structured_data: dict[str, Any],
syslog_version: int,
) -> str:
"""Serialize audit event to RFC 5424 format."""
severity = self._format_severity(event.severity)
timestamp = self._format_timestamp(event.timestamp)
hostname = self._format_hostname(event.hostname)
app_name = self._format_field(self.app_name, 48)
proc_id = self._format_field(event.service_name, 128)
msg_id = self._format_field(event.event_type, 32)
sd_str = self._format_structured_data(structured_data)
msg = self._format_message(event.syslog_message)

return (
f"<{severity}>{syslog_version} "
f"{timestamp} {hostname} {app_name} {proc_id} {msg_id} "
f"{sd_str}{msg}"
)

def _format_severity(self, severity: int) -> int:
"""Calculate PRIORITY value (RFC 5424 section 6.2.1)."""
if not 0 <= severity <= 7:
raise NotImplementedError(f"Severity must be 0-7, got {severity}")

facility_code = self.SYSLOG_FACILITIES.get(
self.facility.lower(),
self.SYSLOG_FACILITIES["authpriv"],
)

return (facility_code << 3) | severity

def _format_timestamp(self, timestamp: float) -> str:
"""Format TIMESTAMP field (RFC 5424 section 6.2.3)."""
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
return dt.isoformat(timespec="milliseconds").replace("+00:00", "Z")

def _format_hostname(self, hostname: str | None) -> str:
"""Format HOSTNAME field (RFC 5424 section 6.2.4)."""
if not hostname:
hostname = socket.gethostname()
Comment thread
Misha-Shvets marked this conversation as resolved.

return self._format_field(hostname, 255)

def _format_field(
self,
value: str | None,
max_length: int,
) -> str:
"""Format generic RFC 5424 field."""
if not value:
return self.NILVALUE

sanitized = "".join(c for c in value if 33 <= ord(c) <= 126)[
:max_length
]
Comment thread
Misha-Shvets marked this conversation as resolved.

return sanitized or self.NILVALUE
Comment thread
Misha-Shvets marked this conversation as resolved.

def _format_structured_data(
self,
structured_data: dict[str, Any],
) -> str:
"""Format STRUCTURED-DATA field (RFC 5424 section 6.3)."""
if not structured_data:
return self.NILVALUE

params = []
for key, value in structured_data.items():
param_name = self._sanitize_param_name(str(key))
if not param_name:
continue

param_value = self._escape_param_value(str(value))
params.append(f'{param_name}="{param_value}"')

if not params:
return self.NILVALUE

sd_id = f"audit@{self.STRUCTURED_DATA_ID_SUFFIX}"
return f"[{sd_id} {' '.join(params)}]"

def _sanitize_param_name(self, name: str) -> str:
"""Sanitize PARAM-NAME for STRUCTURED-DATA.

RFC 5424 allows only printable ASCII (33-126)
except: =, space, ], "
Max length: 32 characters
"""
return "".join(
c
for c in name
if 33 <= ord(c) <= 126 and c not in ("=", " ", "]", '"')
)[:32]
Comment thread
Misha-Shvets marked this conversation as resolved.

def _escape_param_value(self, value: str) -> str:
"""Escape PARAM-VALUE for STRUCTURED-DATA."""
return (
value.replace("\\", "\\\\").replace('"', r"\"").replace("]", r"\]")
Comment thread
Misha-Shvets marked this conversation as resolved.
)

def _format_message(self, msg: str | None) -> str:
"""Format MSG field (RFC 5424 section 6.4)."""
if not msg:
return ""

return f" {self.UTF8_BOM}{msg}"
Comment thread
Misha-Shvets marked this conversation as resolved.
144 changes: 22 additions & 122 deletions app/ldap_protocol/policies/audit/events/service_senders/syslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
"""

import asyncio
import socket
import uuid
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any

from loguru import logger
Expand All @@ -19,43 +16,29 @@
)

from .base import AuditDestinationSenderABC
from .rfc5424_serializer import RFC5424Serializer


class SyslogSender(AuditDestinationSenderABC):
Comment thread
milov-dmitriy marked this conversation as resolved.
"""Syslog sender."""
"""Syslog sender with RFC 5424 support.

Sends audit events to syslog servers using RFC 5424 format.
Supports both TCP and UDP protocols.
"""

service_name: AuditDestinationServiceType = (
AuditDestinationServiceType.SYSLOG
)
SYSLOG_VERSION: int = 1
DEFAULT_TIMEOUT: int = 10
DEFAULT_FACILITY = "authpriv"
SYSLOG_FACILITIES: dict[str, int] = {
"kernel": 0,
"user": 1,
"mail": 2,
"system": 3,
"security": 4,
"syslog": 5,
"printer": 6,
"network": 7,
"uucp": 8,
"clock": 9,
"authpriv": 10,
"ftp": 11,
"ntp": 12,
"audit": 13,
"alert": 14,
"cron": 15,
"local0": 16,
"local1": 17,
"local2": 18,
"local3": 19,
"local4": 20,
"local5": 21,
"local6": 22,
"local7": 23,
}
SYSLOG_VERSION: int = 1

def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize syslog sender with RFC 5424 serializer."""
super().__init__(*args, **kwargs)
self.__rfc_serializer = RFC5424Serializer(
app_name=self.DEFAULT_APP_NAME,
Comment thread
Misha-Shvets marked this conversation as resolved.
facility="authpriv",
)

async def _send_udp(self, message: str) -> None:
"""Send UDP."""
Expand Down Expand Up @@ -84,107 +67,24 @@ async def _send_tcp(self, message: str) -> None:
writer.close()
await writer.wait_closed()

def generate_rfc5424_message(
self,
event: NormalizedAuditEvent,
structured_data: dict[str, Any],
) -> str:
"""Generate a syslog message according to RFC 5424."""
severity_code = event.severity
facility = self.DEFAULT_FACILITY
app_name = self.DEFAULT_APP_NAME
msg_id = str(uuid.uuid4())
message = event.syslog_message
hostname = event.hostname
proc_id = event.service_name

if not 0 <= severity_code <= 7:
raise ValueError("Severity code must be between 0 and 7")

facility_code = self.SYSLOG_FACILITIES.get(
facility.lower(),
self.SYSLOG_FACILITIES[self.DEFAULT_FACILITY],
)
priority = (facility_code << 3) | severity_code

# TIMESTAMP (RFC 5424 section 6.2.3)
dt = datetime.fromtimestamp(event.timestamp, tz=timezone.utc)
timestamp = dt.isoformat(
timespec="milliseconds",
).replace("+00:00", "Z")

# HOSTNAME (section 6.2.4)
hostname = (hostname or socket.gethostname() or "-")[:255]

# APP-NAME (section 6.2.5)
app_name = app_name or "-"
if len(app_name) > 48:
app_name = app_name[:48]

# PROCID (section 6.2.6)
proc_id = proc_id or "-"

# MSGID (section 6.2.7)
msg_id = msg_id or "-"

# STRUCTURED-DATA (section 6.3)
sd_str = self._format_structured_data(app_name, structured_data) or "-"

# MSG (section 6.4)
message = self._escape_message(message) if message else ""

return (
f"<{priority}>{self.SYSLOG_VERSION} {timestamp} "
f"{hostname} {app_name} {proc_id} {msg_id} "
f"{sd_str} {message}"
)

def _escape_message(self, msg: str) -> str:
"""Escape special chars in message (RFC 5424 section 6.4)."""
return " " + msg.replace("\n", " ").replace("\r", " ")

def _format_structured_data(
self,
app_name: str,
structured_data: dict[str, Any],
) -> str:
"""Format structured data according to RFC 5424 section 6.3."""
if not structured_data:
return ""

def escape_param_value(value: str) -> str:
return (
value.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("]", "\\]")
)

sd_id = f"{app_name}@{uuid.uuid4()}"
params = []

for k, v in structured_data.items():
if not k or "=" in k or " " in k or '"' in k:
continue
escaped_value = escape_param_value(str(v))
params.append(f'{k}="{escaped_value}"')

if not params:
return ""

return f"[{sd_id} {' '.join(params)}]"

async def send(self, event: NormalizedAuditEvent) -> None:
"""Send event."""
structured_data = deepcopy(event.destination_dict)

syslog_message = self.generate_rfc5424_message(
syslog_message = self.__rfc_serializer.serialize(
event=event,
structured_data=structured_data,
syslog_version=self.SYSLOG_VERSION,
)

if self._destination.protocol == AuditDestinationProtocolType.UDP:
callback = self._send_udp
elif self._destination.protocol == AuditDestinationProtocolType.TCP:
callback = self._send_tcp
else:
raise NotImplementedError(
f"Unsupported protocol: {self._destination.protocol}",
)

try:
await callback(syslog_message)
Expand Down
2 changes: 1 addition & 1 deletion interface
Loading