Riemann is a flexible, high-performance event stream processor for monitoring and distributed systems. It aggregates, filters, and routes events from various sources to multiple destinations.
| File/Directory | Path | Purpose |
|---|---|---|
| Main config | /etc/riemann/riemann.config |
Main configuration file |
| Plugins | /usr/share/riemann/ |
Plugin directory |
| Logs | /var/log/riemann/ |
Log files |
| Systemd service | /etc/systemd/system/riemann.service |
Service definition |
;; /etc/riemann/riemann.config
; Load necessary namespaces
(ns riemann.config
(:require [riemann.core :refer :all]
[riemann.logging :refer :all]
[riemann.streams :refer :all]))
; ========================
; Logging Configuration
; ========================
(logging/init {:file "/var/log/riemann/riemann.log"
:level :info})
; ========================
; Server Configuration
; ========================
(let [config {:host "0.0.0.0"
:port 5555}]
; TCP server
(tcp-server config)
; UDP server
(udp-server config)
; WebSocket server
(ws-server config))
; ========================
; Index Configuration
; ========================
; Create index for fast event lookup
(index (where (and (service "cpu")
(metric-float-gt 0.8))
(email "admin@example.com")))
; ========================
; Streams Configuration
; ========================
; Define email stream
(def email-stream
(email {:from "riemann@example.com"
:to "admin@example.com"
:subject-prefix "[Riemann Alert] "
:smtp-host "smtp.example.com"
:smtp-port 587
:username "riemann@example.com"
:password "EmailPassword123!"}))
; Define Slack stream
(def slack-stream
(slack {:uri "https://hooks.slack.com/services/XXX/YYY/ZZZ"
:channel "#alerts"
:username "Riemann"
:icon ":warning:"}))
; Define PagerDuty stream
(def pagerduty-stream
(pagerduty {:service-key "your-pagerduty-service-key"}))
; Define InfluxDB stream
(def influxdb-stream
(with {:host "localhost"
:port 8086
:db "riemann"}
(async-queue! :influxdb {:queue-size 10000
:block-size 100
:block-timeout 100}
(influxdb {:host "localhost"
:port 8086
:db "riemann"}))))
; Define Elasticsearch stream
(def elasticsearch-stream
(elasticsearch {:host "localhost"
:port 9200
:index-prefix "riemann"}))
; ========================
; Filtering and Routing
; ========================
; Route events based on service
(streams
; CPU alerts
(where (service #"cpu")
(where (metric-float-gt 0.9)
(email-stream slack-stream))
(where (metric-float-gt 0.7)
(email-stream)))
; Memory alerts
(where (service #"memory")
(where (metric-float-gt 0.9)
(email-stream slack-stream pagerduty-stream))
(where (metric-float-gt 0.8)
(email-stream)))
; Disk alerts
(where (service #"disk")
(where (metric-float-gt 0.95)
(email-stream slack-stream pagerduty-stream))
(where (metric-float-gt 0.85)
(email-stream)))
; Network alerts
(where (service #"network")
(where (metric-gt 1000000)
(email-stream)))
; Service health
(where (service #"service-health")
(where (state "critical")
(email-stream slack-stream pagerduty-stream))
(where (state "warning")
(email-stream)))
; Log all events to Elasticsearch
elasticsearch-stream
; Log metrics to InfluxDB
(where (metric)
influxdb-stream))
; ========================
; Aggregation and Windows
; ========================
; Aggregate CPU usage over 5 minutes
(where (service "cpu")
(coalesce
(by [:host]
(fixed-time-window 300
(smap (partial reduce +)
(smap count
(email-stream)))))))
; Calculate average response time
(where (service "response-time")
(by [:host]
(moving-average 60
(email-stream))))
; ========================
; Custom Functions
; ========================
; Define custom threshold function
(defn threshold [warn crit]
(fn [event]
(let [m (:metric event)]
(cond
(and m (>= m crit)) (assoc event :state "critical" :service (str (:service event) " CRITICAL"))
(and m (>= m warn)) (assoc event :state "warning" :service (str (:service event) " WARNING"))
:else (assoc event :state "ok")))))
; Use custom threshold
(where (service "custom-metric")
(threshold 70 90)
(email-stream))
; ========================
; Rate Limiting
; ========================
; Limit email notifications to 1 per minute
(where (service #"critical")
(rate 60
email-stream))
; ========================
; Event Expiration
; ========================
; Expire events after 1 hour
(where true
(expire 3600))
; TCP/UDP inputs
(tcp-server {:host "0.0.0.0" :port 5555})
(udp-server {:host "0.0.0.0" :port 5555})
; WebSocket input
(ws-server {:host "0.0.0.0" :port 5556})
; HTTP input
(http-server {:host "0.0.0.0" :port 5557})
; Email output
(def email-output
(email {:from "riemann@example.com"
:to ["admin@example.com" "ops@example.com"]
:smtp-host "smtp.example.com"
:smtp-port 587}))
; Slack output
(def slack-output
(slack {:uri "https://hooks.slack.com/services/XXX/YYY/ZZZ"
:channel "#monitoring"}))
; HipChat output
(def hipchat-output
(hipchat {:token "your-hipchat-token"
:room "Monitoring"}))
; SNS output
(def sns-output
(sns {:topic-arn "arn:aws:sns:us-east-1:123456789:riemann-alerts"
:access-key "YOUR_ACCESS_KEY"
:secret-key "YOUR_SECRET_KEY"}))
; Email notification configuration
(defn email-alert [threshold]
(where (metric-float-gt threshold)
(email {:from "riemann@example.com"
:to "admin@example.com"
:subject (fn [event]
(str "[Riemann] " (:service event) " on " (:host event)))
:body (fn [event]
(str "Service: " (:service event) "\n"
"Host: " (:host event) "\n"
"State: " (:state event) "\n"
"Metric: " (:metric event) "\n"
"Time: " (:time event)))})))
; Slack notification configuration
(defn slack-alert []
(slack {:uri "https://hooks.slack.com/services/XXX/YYY/ZZZ"
:channel "#alerts"
:username "Riemann"
:icon ":warning:"
:formatter (fn [event]
{:attachments [{:color (if (= (:state event) "critical") "danger" "warning")
:title (str "Riemann Alert: " (:service event))
:fields [{:title "Host" :value (:host event) :short true}
{:title "Service" :value (:service event) :short true}
{:title "State" :value (:state event) :short true}
{:title "Metric" :value (:metric event) :short true}]}]})}))
# Validate configuration syntax
riemann validate-config /etc/riemann/riemann.config
# Test configuration
riemann test /etc/riemann/riemann.config
# Restart Riemann
sudo systemctl restart riemann
# Check status
sudo systemctl status riemann
# View logs
sudo journalctl -u riemann -f
sudo tail -f /var/log/riemann/riemann.log
# Send test event via TCP
riemann send -h localhost -p 5555 -s "test-service" -m 1.0 -S "ok"
# Send test event via UDP
echo '{"host":"test","service":"test","metric":1.0,"state":"ok"}' | nc -u localhost 5555
# Check if listening
netstat -tlnp | grep 5555
# Query events
riemann query 'true'
# Check index
riemann query 'service = "cpu"'
# Send test event
riemann send -h localhost -s "cpu" -m 0.95 -S "critical"
# Check logs for processing
tail -f /var/log/riemann/riemann.log | grep cpu
Every deployment is unique. We provide consulting for:
Get personalized assistance: office@linux-server-admin.com | Contact Page