目录

最近在做日志收集,准备使用最简单的 rsyslog 来做,因为日志量不是很大,而且大多数 Linux 自带安装 rsyslog,本文记录一下使用上的一些功能和问题。

需求:采集服务器日志和一些 log 文件日志,存入ES中。

安装 rsyslog

对于大多数基于 Linux 的系统,可以直接使用包管理器安装:

sudo apt-get install rsyslog  # 对于Debian/Ubuntu
sudo yum install rsyslog      # 对于CentOS/RHEL

版本说明

查看版本

rsyslogd -v

虽然 Rsyslog 很早就支持直接输出数据给 elasticsearch,但如果你使用的是 v8.4 以下的版本,我们这里并不推荐这种方式。因为 normalize 语法还是比较简单,只支持时间,字符串,数字,ip 地址等几种。在复杂条件下远比不上完整的正则引擎。

因为采集会涉及到一些新功能,比如将日志发送到ES的某个pipeline中,这个功能在较老的版本是没有的,所以推荐大家更新到最新的 8.2402.0 版本,如果你不需要这些功能,可以跳过版本更新,先试试机器上默认的版本是否可以完成你的工作。

更新rsyslog

我的 CentOS 上安装的默认版本为 8.24.0-57.el7_9.3,直接更新到当前最新版 8.2402.0

> rsyslogd -v
rsyslogd 8.24.0-57.el7_9.3, compiled with:
    PLATFORM:               x86_64-redhat-linux-gnu
    PLATFORM (lsb_release -d):      
    FEATURE_REGEXP:             Yes
    GSSAPI Kerberos 5 support:      Yes
    FEATURE_DEBUG (debug build, slow code): No
    32bit Atomic operations supported:  Yes
    64bit Atomic operations supported:  Yes
    memory allocator:           system default
    Runtime Instrumentation (slow code):    No
    uuid support:               Yes
    Number of Bits in RainerScript integers: 64

在线安装

1. 添加软件源

DOWNLOAD OTHER - rsyslog

在 RHEL/CENTOS 上安装 rsyslog - rsyslog --- Install rsyslog on RHEL/CENTOS - rsyslog

cd /etc/yum.repos.d/
wget http://rpms.adiscon.com/v8-stable/rsyslog.repo # for CentOS 7,8,9
yum install rsyslog
yum install liblognorm5 # 更新rsyslog不会更新liblognorm,需要手动更新

2. 更新yum源为国内镜像(可选)

备份原有的 Yum 仓库配置

sudo cp /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup

编辑 Yum 仓库配置

sudo vim /etc/yum.repos.d/CentOS-Base.repo

将文件中的 mirrorlistbaseurl 行替换为以下内容:

[base]
name=CentOS-$releasever - Base - Aliyun
baseurl=http://mirrors.aliyun.com/centos/$releasever/os/$basearch/
gpgcheck=1
enabled=1
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CENTOS7

# [updates]
name=CentOS-$releasever - Updates - Aliyun
baseurl=http://mirrors.aliyun.com/centos/$releasever/updates/$basearch/
gpgcheck=1
enabled=1
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CENTOS7

# [extras]
name=CentOS-$releasever - Extras - Aliyun
baseurl=http://mirrors.aliyun.com/centos/$releasever/extras/$basearch/
gpgcheck=1
enabled=1
gpgkey=http://mirrors.aliyun.com/centos/RPM-GPG-KEY-CENTOS7

离线安装

在线安装源比较慢,且某些网络不能访问,或机器不能访问互联网,可以试试离线安装。

安装包下载路径:Index of /v8-stable/epel-7/x86_64/RPMS (adiscon.com)

下载后放入服务器,执行安装命令,需要下载更新的包如下:

rpm -Uvh libestr-0.1.11-1.el7.x86_64.rpm --nodeps
rpm -Uvh libfastjson4-1.2304.0-1.el7.x86_64.rpm --nodeps
rpm -Uvh rsyslog-8.2404.0-1.el7.x86_64.rpm --nodeps
rpm -Uvh liblognorm5-2.0.6-1.el7.x86_64.rpm --nodeps
rpm -Uvh rsyslog-mmjsonparse-8.2404.0-1.el7.x86_64.rpm --nodeps
rpm -Uvh rsyslog-elasticsearch-8.2404.0-1.el7.x86_64.rpm --nodeps
rpm -Uvh rsyslog-mmnormalize-8.2404.0-1.el7.x86_64.rpm --nodeps

安装 omelasticsearch 和 mmnormalize 模块

首先,确保 omelasticsearch 模块已正确安装。在 CentOS 上,这个模块通常包含在 rsyslog-elasticsearch 包中。你可以运行以下命令来安装它(如果尚未安装,离线更新过的不用重复安装):

sudo yum install -y rsyslog-elasticsearch
sudo yum install -y rsyslog-mmnormalize

确认模块文件存在:

检查 /usr/lib64/rsyslog/ 目录下是否存在 omelasticsearch.sommnormalize.so 文件。这可以通过以下命令完成:

ls -l /usr/lib64/rsyslog/

配置 rsyslog

编辑 rsyslog 配置文件(通常位于 /etc/rsyslog.conf),日志通常会有一些预设配置:

#### RULES ####

# Log all kernel messages to the console.
# Logging much else clutters up the screen.
#kern.*                                                 /dev/console

# Log anything (except mail) of level info or higher.
# Don't log private authentication messages!
*.info;mail.none;authpriv.none;cron.none                /var/log/messages

# The authpriv file has restricted access.
authpriv.*                                              /var/log/secure

# Log all the mail messages in one place.
mail.*                                                  -/var/log/maillog

# Log cron stuff
cron.*                                                  /var/log/cron

# Everybody gets emergency messages
*.emerg                                                 :omusrmsg:*

# Save news errors of level crit and higher in a special file.
uucp,news.crit                                          /var/log/spooler

# Save boot messages also to boot.log
local7.*                                                /var/log/boot.log

这部分是 rsyslog 配置文件中的规则部分,它定义了如何处理和转发接收到的日志消息。每条规则都指定了日志消息的来源和类型,以及应该如何处理这些消息。这里是对这些规则的解释:

预设规则解释

  1. Kernel 日志到控制台:

    #kern.* /dev/console

    这条规则被注释掉了。如果启用,它会将所有内核(kern)相关的消息发送到控制台设备(/dev/console)。通常这样做会使控制台非常杂乱,因此在多数配置中这条规则都是被禁用的。

  2. 普通信息日志到 /var/log/messages,但排除邮件、认证和定时任务相关日志:

    *.info;mail.none;authpriv.none;cron.none /var/log/messages

    这条规则表示将所有信息级别(info)或更高级别的日志写入到 /var/log/messages 文件中,但不包括邮件(mail)、权限相关的认证消息(authpriv)和定时任务(cron)的日志。

  3. 权限认证日志到 /var/log/secure:

    authpriv.* /var/log/secure

    所有与认证权限(authpriv)相关的日志都被记录到 /var/log/secure。这通常包括敏感的认证信息,因此 /var/log/secure 文件的访问权限通常被严格控制。

  4. 邮件日志到 /var/log/maillog:

    mail.* -/var/log/maillog

    所有邮件相关的日志都记录到 /var/log/maillog。这里的 - 符号意味着在写入日志前不进行同步操作,这可能会提高性能但在系统异常时可能丢失最近的日志。

  5. 定时任务日志到 /var/log/cron:

    cron.* /var/log/cron

    所有定时任务(cron)相关的日志都记录到 /var/log/cron

  6. 紧急消息到所有用户:

    *.emerg :omusrmsg:*

    所有紧急级别的日志(emerg)将被发送到所有在线用户。这通常用于系统级的紧急通知。

  7. 新闻和UUCP错误到特定文件:

    uucp,news.crit /var/log/spooler

    uucpnews 子系统中等级为 crit(关键级别)或更高级别的日志被记录到 /var/log/spooler

  8. 特定设施的日志到 boot.log:

    local7.* /var/log/boot.log

    所有设施代码为 local7 的日志都被记录到 /var/log/boot.loglocal7 通常被系统管理员用来记录特定的、自定义的日志信息。

这些规则使得日志信息按照类型和重要性被分类和存储到不同的文件中,以便于日后的查阅和管理。如果你需要调整或优化这些规则以满足特定的日志管理需求,可以根据实际情况修改或添加新的规则。

采集服务器TCP和UDP日志

需要启用 rsyslog 的 imtcp 模块(对 TCP 日志)和 imudp 模块(对 UDP 日志):

module(load="imtcp")
input(type="imtcp" port="514")

module(load="imudp")
input(type="imudp" port="514")

接下来,配置 rsyslog 将日志数据转发到 Elasticsearch。可以通过 omelasticsearch 模块实现这一点:

module(load="omelasticsearch")

# 定义系统日志模板
template(name="plain-syslog"
         type="list") {
           constant(value="{")
             constant(value="\"timestamp\":\"")     property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"host\":\"")        property(name="hostname")
             constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
             constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
             constant(value="\",\"tag\":\"")         property(name="syslogtag")
             constant(value="\",\"message\":\"")     property(name="msg" format="json")
           constant(value="\"}\n")
         }

发送日志操作:

action(type="omelasticsearch"
       template="plain-syslog"
       server="192.168.5.1"
       serverport="9200"
       searchIndex="syslog-index" # ES的Index
       searchType="syslog"
       bulkmode="on"
       queue.type="linkedlist"
       queue.size="5000"
       queue.dequeuebatchsize="300"
       action.resumeretrycount="-1"
       )

详细分析这段配置:

  • type="omelasticsearch":指定操作类型为 omelasticsearch,意味着这是一个向 Elasticsearch 发送日志的操作。
  • template="plain-syslog":使用名为 plain-syslog 的模板来格式化日志数据。这个模板应该在配置文件的其他部分定义,描述了日志消息应该如何被格式化成 JSON 或其他格式以适应 Elasticsearch。
  • server="192.168.5.1":日志数据将被发送到 IP 地址为 192.168.5.1 的服务器。
  • serverport="9210":与 Elasticsearch 服务器通信使用的端口号为 9210。
  • searchIndex="syslog-index":Elasticsearch 中用于存储日志的索引名称为 syslog-index
  • searchType="syslog":日志数据在 Elasticsearch 中的类型为 syslog
  • bulkmode="on":启用批量模式,这允许 rsyslog 以批量形式发送日志数据,提高效率。
  • queue.type="linkedlist":定义了用于缓存日志消息的队列类型为链表。
  • queue.size="5000":队列可以存储的最大日志消息数量为 5000。
  • queue.dequeuebatchsize="300":每次从队列中取出的日志消息数量为 300。
  • action.resumeretrycount="-1":如果发送失败,重试次数无限。

重启后,看看es是否有新建 index,是否有数据

sudo systemctl restart rsyslog

检查配置文件是否有误:

sudo rsyslogd -N1

查看错误信息:

sudo rsyslogd -dn

系统日志,遇到未知错误可以查看一下:

journalctl -u rsyslog

如何理解 rsyslog 中的数据流

rsyslog 配置中,action 部分定义了数据如何被处理和发送,但它本身不指定数据来源。数据来源是由其他模块(如 imuxsockimjournalimtcpimudp 等)在配置文件的其他部分指定的。这些模块通过接收日志输入来确定数据来源,然后 action 指令基于这些输入进行处理和转发。

rsyslog 中,数据流通常按以下方式组织:

  1. 输入模块im*):这些模块负责从各种来源接收数据,比如系统日志、网络、文件等。
  2. 规则和条件:配置文件中的规则(通常基于条件表达式)决定了从输入接收的数据如何处理。
  3. 模板:定义了数据的格式化方式。
  4. 动作action):指定了数据发送的目的地,比如 Elasticsearch、文件系统等。

如果要让不同的 action 处理不同来源的数据,需要在 action 之前使用条件来区分数据。

区分不同来源的数据

假设你已经有 TCPUDP输入激活,并且你希望:

  • 将从 TCP 收到的日志发送到一个 Elasticsearch 索引。
  • 将从 UDP 收到的日志发送到另一个 Elasticsearch 索引。

你可以通过条件判断来实现这一点。这里是一个如何设置的示例:

TCP 日志

if $inputname == 'imtcp' then {
    action(type="omelasticsearch"
           template="system-syslog"
           server="192.168.5.1"
           serverport="9210"
           searchIndex="tcp-syslog-index"
           searchType="syslog"
           bulkmode="on")
}

UDP 日志

if $inputname == 'imudp' then {
    action(type="omelasticsearch"
           template="network-syslog"
           server="192.168.5.1"
           serverport="9210"
           searchIndex="udp-syslog-index"
           searchType="syslog"
           bulkmode="on")
}

读取文件日志,存入ES

module(load="imfile" PollingInterval="10", statefile.directory="/etc/rsyslog.d")  # 加载 imfile 模块; PollingInterval 调整轮询间隔,秒; statefile.directory 记录日志状态的文件夹,避免重复采集,确保传入的文件夹已存在

# 定义一个模板,用于格式化发送到 Elasticsearch 的日志
template(name="json-template"
         type="list") {
           constant(value="{")
             constant(value="\"timestamp\":\"")      property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"message\":\"")      property(name="msg" format="json")
             constant(value="\",\"host\":\"")         property(name="hostname")
             constant(value="\",\"severity\":\"")     property(name="syslogseverity-text")
             constant(value="\",\"facility\":\"")     property(name="syslogfacility-text")
             constant(value="\",\"syslogtag\":\"")    property(name="syslogtag")
           constant(value="\"}\n")
         }

# 设置读取指定日志文件的输入模块
input(type="imfile"
      File="/home/logs/info.log"
      Tag="notemi-arrange-log"
      Severity="info"
      Facility="local7")

# 将读取的日志发送到 Elasticsearch
action(type="omelasticsearch"
       template="json-template"
       server="192.168.5.1"
       serverport="9210"
       searchIndex="log-test-index"
       searchType="_doc"  # Elasticsearch 7.x 之后使用 _doc 作为统一文档类型
       bulkmode="on"
       queue.type="linkedlist"
       queue.size="5000"
       queue.dequeuebatchsize="300"
       action.resumeretrycount="-1")

案例1:采集文件多行日志

rsyslog 默认是按行采集,如果我们要采集多行,就需要用配置 input 中的 startmsg.regex

input(type="imfile"
      File="/home/spring/error.log"
      Tag="notemi-arrange-error"
      addMetadata="on"
      startmsg.regex="^\\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{3}\\]"  # 正则表达式匹配新日志项的开始
      readTimeout="2")

比如我的日志可能会是:

[2024-06-28 13:50:57.441] [http-nio-9202-exec-32] INFO  c.o.s.s.i.ServiceImpl - [getDataPermission,576] - get user info by id: 1
[2024-06-28 13:50:57.441] [http-nio-9401-exec-2] ERROR c.o.c.s.h.GlobalExceptionHandler - [handleRuntimeException,111] - 请求地址'/test',请求参数{"id": 1},发生未知异常: java.lang.RuntimeException: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'user' doesn't exist
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

很明显第二段的日志是多行的,我们就用 ^\\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{3}\\] 来匹配每行的开头(\需要转义,所以是\\),这样采集出的每一条日志,就是以正则匹配内容开头来分割的了。

Rsyslog使用记录.md

正则测试大家可以在 regex101: build, test, and debug regex 测试。

案例2:一个imfile input采集多个文件

可以使用通配符;不能使用通配符的多个文件,就写多个 imfile input

input(type="imfile"
      File="/home/spring/*.log" # 使用通配符
      Tag="spring-error"
      addMetadata="on"
      startmsg.regex="^\\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\\.[0-9]{3}\\]"  # 正则表达式匹配新日志项的开始
      readTimeout="2")

案例3:读取多个文件,分别写入到多个es index中

imfile input 中指定 Releset ,通过 Ruleset 来判断走哪个逻辑

#### 日志1,写入es index-1
input(type="imfile"
      File="/home/spring1/*.log"
      Tag="spring1-log"
      addMetadata="on"
      Ruleset="spring1-parser"
      )

#### 日志2,写入es index-2
input(type="imfile"
      File="/home/spring2/*.log"
      Tag="spring2-log"
      addMetadata="on"
      Ruleset="spring2-parser"
      )

ruleset(name="spring1-parser") {
  # 把日志写入 Elasticsearch spring1-log
  action(type="omelasticsearch"
        template="notemi-log-template"
        server="192.168.5.1"
        serverport="9210"
        searchIndex="spring1-log"
        searchType="_doc"
        bulkmode="on"
        queue.type="linkedlist"
        queue.size="5000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
       )
}

ruleset(name="spring2-parser") {
  # 把日志写入 Elasticsearch spring2-log
  action(type="omelasticsearch"
        template="notemi-log-template"
        server="192.168.5.1"
        serverport="9210"
        searchIndex="spring2-log"
        searchType="_doc"
        bulkmode="on"
        queue.type="linkedlist"
        queue.size="5000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
       )
}

案例4:解析日志文件内容,提取字段

使用 mmnormalize,编写 rulebase 文件来匹配

例如我的日志格式为:

2024-06-17 18:10:49.139 [INFO] database P0000001560 T0000000000000001560  hlog_sys_destroy, n_logs[1], adjust_sta[0]

我想提取出下面两个字段,存入ES:

时间字段 datetime: 2024-06-17 18:10:49.139

level: INFO

1. 编写rulebase

rulebase 文件一般存放于 /etc/rsyslog.d 文件夹中

根据上面的日志格式可以写出:

vi /etc/rsyslog.d/parse_dm.rb

# 2024-06-17 18:10:49.139 [INFO] database P0000001560 T0000000000000001560  hlog_sys_destroy, n_logs[1], adjust_sta[0]
version: 2
rule=:%date:word% %time:word% [%level:char-to:]%] %msg:rest%

rule 中分别提取出 date, time, level,除了这三部分,其它的都为msg

ps: 本来想直接提取出2024-06-17 18:10:49.139,但是不知道怎么写 - -! 试了很多方法都不行,就这样分别提取,然后在配置文件中拼接了。

怎么调试 rulebase 下一章讲。

2. 调整配置文件

我们在模版 notemi-log-template 中加入自定义字段,property name 设置为 $!time_str$!level 来接收解析出的值。

template(name="notemi-log-template"
         type="list") {
           constant(value="{")
             constant(value="\"collect_time\":\"")     property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time\":\"")          property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time_str\":\"")      property(name="$!time_str") # 定义变量,解析日志内容后赋值
             constant(value="\",\"type\":\"")         property(name="$!type")
             constant(value="\",\"level\":\"")        property(name="$!level") # 定义变量,解析日志内容后赋值
             constant(value="\",\"message\":\"")      property(name="msg" format="json")
             constant(value="\",\"host\":\"")         property(name="hostname")
             constant(value="\",\"severity\":\"")     property(name="syslogseverity-text")
             constant(value="\",\"facility\":\"")     property(name="syslogfacility-text")
             constant(value="\",\"syslogtag\":\"")    property(name="syslogtag")
           constant(value="\"}\n")
         }

#### 日志1,写入es index-1
input(type="imfile"
      File="/home/spring1/*.log"
      Tag="spring1-log"
      addMetadata="on"
      Ruleset="spring1-parser"
      )

ruleset(name="spring1-parser") {
  action(type="mmnormalize"
           rulebase="/etc/rsyslog.d/parse_dm.rb")
  set $!time_str = $!date & ' ' & $!time; # 拼接解析出的值,赋值给time_str
  set $!level = $!level; # 拿到解析出的值
  # 把日志写入 Elasticsearch spring1-log
  action(type="omelasticsearch"
        template="notemi-log-template"
        server="192.168.5.1"
        serverport="9210"
        searchIndex="spring1-log"
        searchType="_doc"
        bulkmode="on"
        queue.type="linkedlist"
        queue.size="5000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
       )
}

这时候,存入ES中的 time_strlevel 就有值了

案例5:怎么测试rb文件是否能正常解析日志

安装 liblognorm

直接安装

如果你的本地能直接安装 liblognorm ,且能正常使用 lognormalizer ,那么可以直接使用命令:

cat election.log | lognormalizer -r election.rb -e json

echo '[2024-05-29 11:24:29.209974] INFO Some message with details' | lognormalizer -r election.rb -e json

Docker 安装

Dockerfile:

ROM alpine:3.14

ENV LANG en_US.UTF-8
ENV LANGUAGE en_US.UTF-8
ENV LC_ALL=en_US.UTF-8

RUN apk --no-cache --no-progress update && \
    apk --no-cache --no-progress upgrade

RUN apk add liblognorm

Build 镜像:

docker build -t lognormalizer:latest .

启动容器:

docker run --name lognormalizer --rm -it -v `pwd`/test:/test mdm.olm.com:18088/lognormalizer:latest sh 

这里我是将当前路径下的 test 文件夹挂载到容器内了,里面放日志文件和要测试的 rulebase 文件

进入容器:

docker exec -it lognormalizer sh

进入容器后,就和直接安装的效果一样了,都是使用:

cat election.log | lognormalizer -r election.rb -e json

echo '[2024-05-29 11:24:29.209974] INFO Some message with details' | lognormalizer -r election.rb -e json

测试rulebase

cat election.log | lognormalizer -r election.rb -e json

指定待解析日志文件和 rulebase 文件,执行命令。

如果正常返回了你要解析的字段,就说明解析成功,字段也就正常提取出来了,比如:

{ "msg": "Thread 1 advanced to log sequence 4271 (LGWR switch)\\\\n  Current log# 2 seq# 4271 mem# 0: \/home\/oracle\/app\/oradata\/orcl\/redo02.log", "datetime": "Thu Jun 27 15:01:00 2024" }
{ "msg": "Thread 1 advanced to log sequence 4281 (LGWR switch)\\\\n  Current log# 3 seq# 4281 mem# 0: \/home\/oracle\/app\/oradata\/orcl\/redo03.log", "datetime": "Fri Jun 28 10:00:23 2024" }

但如果返回了 originalmsgunparsed-data 字段(未解析内容),说明解析不对或解析不完整,需要调整 rulebase 文件,比如:

{ "originalmsg": "Thu Jun 27 15:01:00 2024\\nThread 1 advanced to log sequence 4271 (LGWR switch)\\\\n  Current log# 2 seq# 4271 mem# 0: \/home\/oracle\/app\/oradata\/orcl\/redo02.log", "unparsed-data": "Thread 1 advanced to log sequence 4271 (LGWR switch)\\\\n  Current log# 2 seq# 4271 mem# 0: \/home\/oracle\/app\/oradata\/orcl\/redo02.log" }
{ "originalmsg": "Fri Jun 28 10:00:23 2024\\nThread 1 advanced to log sequence 4281 (LGWR switch)\\\\n  Current log# 3 seq# 4281 mem# 0: \/home\/oracle\/app\/oradata\/orcl\/redo03.log", "unparsed-data": "Thread 1 advanced to log sequence 4281 (LGWR switch)\\\\n  Current log# 3 seq# 4281 mem# 0: \/home\/oracle\/app\/oradata\/orcl\/redo03.log" }

rulebase 的调试是个麻烦事,多测试吧。

这里有个官方的示例,可以参考下:liblognorm/rulebases at master · rsyslog/liblognorm (github.com)

案例6:通过input tag来判断不同的日志源

场景:文件日志源不一样,但存入相同的 ES Index 中,需要 index 中某些字段做出区分

例如,在 ES 中加一个字段 type 来区分不同日志文件的来源类型,imfile input 中加入 tag,然后用 tag 来区分。

template(name="notemi-log-template"
         type="list") {
           constant(value="{")
             constant(value="\"collect_time\":\"")     property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time\":\"")          property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time_str\":\"")      property(name="$!time_str")
             constant(value="\",\"type\":\"")         property(name="$!type") # 字段来区分不同的日志类型
             constant(value="\",\"level\":\"")        property(name="$!level")
             constant(value="\",\"message\":\"")      property(name="msg" format="json")
             constant(value="\",\"host\":\"")         property(name="hostname")
             constant(value="\",\"severity\":\"")     property(name="syslogseverity-text")
             constant(value="\",\"facility\":\"")     property(name="syslogfacility-text")
             constant(value="\",\"syslogtag\":\"")    property(name="syslogtag")
           constant(value="\"}\n")
         }

#### 日志1,写入es index-1
input(type="imfile"
      File="/home/spring1/*.log"
      Tag="spring1-log"
      addMetadata="on"
      Ruleset="single-parser"
      )

#### 日志2,写入es index-1
input(type="imfile"
      File="/home/spring2/*.log"
      Tag="spring2-log"
      addMetadata="on"
      Ruleset="single-parser"
      )

ruleset(name="spring-parser") {
  action(type="mmnormalize"
           rulebase="/etc/rsyslog.d/parse_dm.rb")
  set $!time_str = $!date & ' ' & $!time; # 拼接解析出的值,赋值给time_str
  set $!level = $!level; # 拿到解析出的值
  if $programname contains 'spring1-log' then {
    set $!type = "spring1";
  } else if $programname contains 'spring2-log' then {
    set $!type = "spring2";
  }

  # 把日志写入 Elasticsearch spring1-log
  action(type="omelasticsearch"
        template="notemi-log-template"
        server="192.168.5.1"
        serverport="9210"
        searchIndex="spring1-log"
        searchType="_doc"
        bulkmode="on"
        queue.type="linkedlist"
        queue.size="5000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
       )
}

案例7:字段格式转换

上面我们提取了时间字符串 2024-6-28 16:11:08,但我们存入ES中,要做按时间筛选,怎么将时间字符串转为时间格式呢?

rsyslog 中不支持这种操作,只能通过其它方式,我这里采用了将日志存入 ES 时经过 ESpipeline,来做时间转换,如果你不是存到 ES,考虑其它日志采集方案吧(logstash, filebeat)。

ES 创建 pipeline

PUT _ingest/pipeline/dm-log-pipeline
{
  "description": "Convert time_str to date and store in time",
  "processors": [
    {
      "date": {
        "field": "time_str",
        "target_field": "time",
        "formats": [
          "yyyy-MM-dd HH:mm:ss.SSS",
          "ISO8601"
        ],
        "timezone": "Asia/Shanghai"
      }
    }
  ]
}
  • formats:指定原时间字符串格式
  • field: 原字段
  • target_field: 目标字段,要存在,我是在创建模版时,设置了一个time字段,取的采集日志时间

更多pipeline使用,不是本文重点,不赘述。

rsyslog 中设置 pipeline,老版本的 omelasticsearch 模块不支持指定 pipeline,请升级版本:

# 把日志写入 Elasticsearch spring1-log
  action(type="omelasticsearch"
        template="notemi-log-template"
        server="192.168.5.1"
        serverport="9210"
        searchIndex="spring1-log"
        searchType="_doc"
        bulkmode="on"
        queue.type="linkedlist"
        queue.size="5000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
        pipelineName="notemi-log-pipeline" # 指定pipeline
       )

这样,日志在存到 ES 时,time_str 字段就会转为时间格式,并覆盖到 time 字段了

完整示例

# 启用imtcp,imudp
module(load="imtcp")
input(type="imtcp" port="514")

module(load="imudp")
input(type="imudp" port="514")
# 启用imtcp,imudp END

# 启用es模块
module(load="omelasticsearch")

# 启用mmnormalize
module(load="mmnormalize")

module(load="imfile" PollingInterval="10" statefile.directory="/etc/rsyslog.d")  # 加载 imfile 模块, 调整轮询间隔根据需要

# 设置sys日志模版
template(name="plain-syslog"
         type="list") {
           constant(value="{")
             constant(value="\"collect_time\":\"")     property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time\":\"")            property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time_str\":\"")      property(name="$!time_str" dateFormat="rfc3339")
             constant(value="\",\"type\":\"")         property(name="syslogfacility-text")
             constant(value="\",\"level\":\"")       property(name="syslogseverity-text")
             constant(value="\",\"host\":\"")        property(name="hostname")
             constant(value="\",\"severity\":\"")    property(name="syslogseverity-text")
             constant(value="\",\"facility\":\"")    property(name="syslogfacility-text")
             constant(value="\",\"syslogtag\":\"")         property(name="syslogtag")
             constant(value="\",\"message\":\"")     property(name="msg" format="json")
           constant(value="\"}\n")
         }

# 发送 linux-syslog 到es
action(type="omelasticsearch"
       template="plain-syslog"
       server="192.168.5.1"
       serverport="9210"
       searchIndex="linux-192.168.5.1-log"
       searchType="syslog"
       bulkmode="on"
       queue.type="linkedlist"
       queue.size="5000"
       queue.dequeuebatchsize="300"
       action.resumeretrycount="-1")

# 配置项目日志模板
template(name="notemi-log-template"
         type="list") {
           constant(value="{")
             constant(value="\"collect_time\":\"")     property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time\":\"")          property(name="timereported" dateFormat="rfc3339")
             constant(value="\",\"time_str\":\"")      property(name="$!time_str")
             constant(value="\",\"type\":\"")         property(name="$!type")
             constant(value="\",\"level\":\"")        property(name="$!level")
             constant(value="\",\"message\":\"")      property(name="msg" format="json")
             constant(value="\",\"host\":\"")         property(name="hostname")
             constant(value="\",\"severity\":\"")     property(name="syslogseverity-text")
             constant(value="\",\"facility\":\"")     property(name="syslogfacility-text")
             constant(value="\",\"syslogtag\":\"")    property(name="syslogtag")
           constant(value="\"}\n")
         }

#### opengauss数据库日志
input(type="imfile"
      File="/home/opengauss/master_data/pg_log/postgresql-*.log"
      Tag="opengauss-log"
      addMetadata="on"
      Ruleset="multi-line-parser"
      startmsg.regex="^\\[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2}.[[:digit:]]{3}\\"  # 正则表达式匹配新日志项的开始 2024-06-27 00:00:59.727
      )

ruleset(name="multi-line-parser") {

  if $programname contains 'opengauss-log' then {
    set $!type = "opengauss-log";
    action(type="mmnormalize"
           rulebase="/etc/rsyslog.d/parse_opengauss.rb")
    set $!time_str = $!date & ' ' & $!time;
    set $!level = $!level;
  }

  # 把日志写入 Elasticsearch notemi-log
  action(type="omelasticsearch"
        template="notemi-log-template"
        server="192.168.5.1"
        serverport="9210"
        searchIndex="opengauss-192.168.5.1-log"
        searchType="_doc"
        bulkmode="on"
        queue.type="linkedlist"
        queue.size="5000"
        queue.dequeuebatchsize="300"
        action.resumeretrycount="-1"
        pipelineName="opengauss-log-pipeline"
       )
}

错误记录

could not load module 'mmnormalize', errors: trying to load module /usr/lib64/rsyslog/mmnormalize.so: /usr/lib64/rsyslog/mmnormalize.so: undefined symbol: ln_loadSamplesFromString [v8.2404.0 try https://www.rsyslog.com/e/2066 ]

  1. 如果 /usr/lib64/rsyslog/ 文件夹下没有 mmnormalize.so 文件

安装 rsyslog-mmnormalize

sudo yum install -y rsyslog-mmnormalize
  1. 如果 /usr/lib64/rsyslog/ 文件夹下有 mmnormalize.so 文件

安装 liblognorm5

sudo yum install -y liblognorm5

ES中或目标存储日志路径没数据

  • 检查 imfile input File 文件是否存在

  • 如果使用了 startmsg.regex,检查正则表达式是否正确
  • 如果使用了 rulebase,检查 rulebase 配置是否正确

参考文档

官方文档:Configuration — Rsyslog documentation

安装说明:Installing rsyslog from Package — Rsyslog documentation