UEBA pipeline implementation¶
Dr. Crucible had been staring at authentication logs for three weeks when he noticed it: a user who logged in from Ankh-Morpork at 09:00 and from Tsort at 09:15. Physically impossible. The rules-based detection system had no rule for this, because the rules system checked for impossible travel within a single session, and these were two separate sessions. The gap was not a failure of the rules system; rules are good at catching what they know about. The gap was in the space between rules: the subtle, gradual, statistical deviations that a patient attacker or a compromised account produces over days and weeks. Machine learning does not replace rules. It fills the space between them.
Architecture overview¶
The UEBA pipeline has three data sources, one processing microservice, and writes results back to Graylog.
Data sources:
Keycloak authentication events, forwarded to Graylog via the Graylog GELF input. Every login, logout, token issuance, MFA attempt, and failed authentication generates a GELF message.
Teleport access events, forwarded via syslog. Every session start, session end, and node connection is logged.
Wazuh auditd events, enriched by the Wazuh pipeline before they reach Graylog. Commands executed on monitored hosts are logged with the executing user’s identity.
Processing pipeline:
Keycloak --> Graylog GELF input --|
Teleport --> Graylog Syslog --|--> Graylog pipeline --> Kafka topic: ueba-events
Wazuh --> Graylog Beats --|
Kafka: ueba-events --> ueba-processor (Python, Kubernetes) --> Graylog REST API
--> anomaly_score field
--> routed to UEBA Alerts stream
Kafka setup¶
A single-node Kafka instance runs in the ueba namespace on the Kubernetes cluster. Single-node is acceptable because the UEBA pipeline is not a safety-critical system; a brief Kafka outage means events are not scored, but they are not lost (Graylog buffers them).
# kafka-values.yaml for Helm deployment
replicaCount: 1
zookeeper:
replicaCount: 1
persistence:
enabled: true
size: 50Gi
storageClass: hcloud-volumes
config:
log.retention.hours: 168
log.retention.bytes: -1
num.partitions: 4
default.replication.factor: 1
The Kafka topic ueba-events is created with 4 partitions, one per ueba-processor replica (with two replicas, each replica consumes from 2 partitions, providing parallelism without complex partition assignment).
Topic configuration:
kafka-topics.sh --create \
--bootstrap-server kafka.ueba.svc.cluster.local:9092 \
--topic ueba-events \
--partitions 4 \
--replication-factor 1 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
Graylog Kafka output¶
Graylog forwards enriched authentication and access events to the ueba-events Kafka topic using the Graylog Kafka output plugin. The output is configured on the UEBA source stream, which collects events from all three data sources.
The Graylog pipeline rule that routes events to this stream:
rule "Route to UEBA pipeline"
when
(
has_field("keycloak_user") OR
has_field("teleport_user") OR
has_field("auditd_user")
) AND
has_field("src_ip")
then
let username = coalesce(
$message.keycloak_user,
$message.teleport_user,
$message.auditd_user,
"unknown"
);
set_field("ueba_username", username);
route_to_stream("UEBA Source Events");
end
ueba-processor microservice¶
The ueba-processor is a Python service that consumes from the ueba-events Kafka topic, scores each event against the user’s baseline model, and writes the anomaly score back to Graylog.
Kubernetes deployment:
apiVersion: apps/v1
kind: Deployment
metadata:
name: ueba-processor
namespace: ueba
spec:
replicas: 2
selector:
matchLabels:
app: ueba-processor
template:
metadata:
labels:
app: ueba-processor
spec:
containers:
- name: ueba-processor
image: registry.golemtrust.am/security/ueba-processor:latest
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "kafka.ueba.svc.cluster.local:9092"
- name: KAFKA_TOPIC
value: "ueba-events"
- name: GRAYLOG_API_URL
value: "https://graylog.golemtrust.am/api"
- name: GRAYLOG_API_TOKEN
valueFrom:
secretKeyRef:
name: ueba-secrets
key: graylog-api-token
- name: MODEL_BUCKET
value: "ueba-models.golemtrust.am"
- name: POSTGRES_URI
valueFrom:
secretKeyRef:
name: ueba-secrets
key: postgres-uri
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
Processor main loop¶
The processor consumes events, extracts features, scores them, and writes scores back:
# ueba_processor/main.py (simplified structure)
from kafka import KafkaConsumer
import json
import pickle
import boto3
from feature_extraction import extract_features
from model_loader import load_model_for_user
from graylog_client import update_message_field
consumer = KafkaConsumer(
'ueba-events',
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
group_id='ueba-processor-group',
auto_offset_reset='latest'
)
for message in consumer:
event = message.value
username = event.get('ueba_username')
if not username or username == 'unknown':
continue
features = extract_features(event)
model = load_model_for_user(username)
if model is None:
# User has no baseline yet (first 30 days); skip scoring
continue
raw_score = model.decision_function([features])[0]
# Convert Isolation Forest score (-inf to +inf) to 0.0-1.0
# Negative scores are anomalies; the more negative, the more anomalous
normalised_score = max(0.0, min(1.0, (0.5 - raw_score)))
update_message_field(
message_id=event['_id'],
fields={
'ueba_anomaly_score': normalised_score,
'ueba_scored': True,
'ueba_model_version': model.version
}
)
Model loading and caching¶
Models are stored as pickle files in the ueba-models.golemtrust.am Hetzner Object Storage bucket. The processor loads models at startup and checks for updates every hour.
# Model file naming convention
ueba-models.golemtrust.am/
models/
{username}-current.pkl
{username}-previous.pkl
{username}-v{n}.pkl
The processor maintains an in-memory cache of loaded models. When the hourly check finds a newer current.pkl for a user, it loads the new model and replaces the cached version.