[安全运营] - 基于特定的场景服务优化Tips

现在需要针对安全设备系统性能这块的关键点,添加监控,需要只把系统性能、服务状态等相关的日志发送到告警中心的kafka服务

现在手头上有一些可以直接利用的资源:

  • 设备本身只支持syslog;
  • 自用rsyslog服务、自用Kafka服务、告警中心Kafka服务、自用gohangout服务;

在rsyslog接收到设备发来的syslog之后,基于特定的字段内容进行筛选日志,若包含有特定字段内容的日志,直接发送到“告警中心Kafka”;

“type”: “系统”

json

# For Waf System Monitor
if $fromhost-ip == '10.1.1.1' then {
:msg, contains, '"type": "系统"' {
    action(
        broker=["192.168.0.1:9092","192.168.0.2:9092","192.168.0.3:9092"]
        type="omkafka"
        topic="sec-waf-system-monitor-info"
        template="remotesyslog"
        queue.filename="sec-waf-system-monitor-info"
        partitions.number="3"
        queue.workerThreads="4"
        queue.spoolDirectory="/data/rsyslog/queue"
        queue.size="300000"
        queue.maxdiskspace="2G"
        queue.lowwatermark="20000"
        queue.highwatermark="200000"
        queue.timeoutEnqueue="0"
        queue.discardSeverity="4"
        queue.discardmark="250000"
        queue.type="LinkedList"
        queue.saveonshutdown="on"
        queue.dequeuebatchsize="4"
        errorFile="/data/rsyslog/rsyslog.err"
        confParam=[
        "compression.codec=gzip",
        "socket.timeout.ms=1000",
        "socket.keepalive.enable=true"
        ]
        partitions.auto="on"
        resubmitonfailure="on"
        )
        }
}

利用gohangout,将已经存入自用Kafka的日志,进行消费筛选,从新写入到“告警中心Kafka”;

“type”: “系统”

json

inputs:
    - Kafka:
        topic:
            sec-waf-log: 1
        codec: json
        consumer_settings:
            bootstrap.servers: "kafka-master1:9092,kafka-master2:9092,kafka-master3:9092"
            group.id: kafka
            auto.commit.interval.ms: '3000'
            messages_queue_length: 10

// 过滤掉所有字段值不符合的日志,只要是type的值不是系统,处理方式为丢弃
filters:
    - Drop:
          if:
            - '!EQ(type,"系统")'

outputs:
    - Kafka:
        topic: sec-waf-system-monitor-info
        producer_settings:
            bootstrap.servers: 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092
            flush.interval.ms: "3000"
            metadata.max.age.ms: "10000"

在rsyslog接收到设备发来的syslog之后,基于特定的字段内容进行筛选日志,若包含有特定字段内容的日志,通过http的接口调用,发送到“告警中心Kafka”的 API接口;

“type”: “系统”

这块内容只做了了解,但是没有真实使用,还是选择了方法1的简单粗暴方式;

json

action(
    type="omhttp"
    server="your.http.server"
    serverport="80"
    usehttps="off"
    url="/your/api/endpoint"
    restpath="rest/endpoint"  # 指定 API 端点
    dynurl="off"
    httpmethod="post"  # 指定使用 POST 方法
    httpheader="Authorization: Bearer YOUR_ACCESS_TOKEN"
    template="MyCustomTemplate"
)

配置完成rsyslog,记得重启服务,观察服务是否正常! 配置完成rsyslog,记得重启服务,观察服务是否正常! 配置完成rsyslog,记得重启服务,观察服务是否正常!

相关内容