Pipeline 配置
Pipeline 是 GreptimeDB 中对 log 数据进行解析和转换的一种机制,由一个唯一的名称和一组配置规则组成,这些规则定义了如何对日志数据进行格式化、拆分和转换。目前我们支持 JSON(application/json
或者 application/x-ndjson
,推荐后者)和纯文本(text/plain
)格式的日志数据作为输入。
这些配置以 YAML 格式提供,使得 Pipeline 能够在日志写入过程中,根据设定的规则对数据进行处理,并将处理后的数据存储到数据库中,便于后续的结构化查询。
整体结构
Pipeline 由四部分组成:Processors、Dispatcher、Transform 和 Table suffix。 Processors 对数据进行预处理。 Dispatcher 可以将 pipeline 执行上下文转发到不同的后续 pipeline 上。 Transform 决定最终保存在数据库中的数据类型和表结构。 Table suffix 支持将数据保存到不同的表中。
- Processor 用于对 log 数据进行预处理,例如解析时间字段,替换字段等。
- Dispatcher(可选) 用于将执行上下文转发到另一个 pipeline,同一个输入批次的数据可以基于特定的值被不同的 pipeline 进行处理。
- Transform(可选) 用于对数据进行格式转换,例如将字符串类型转换为数字类型,并且指定数据库表中列的索引信息。
- Table suffix(可选) 用于将数据存储到不同的表中,以便后续使用。
一个包含 Processor 和 Transform 的简单配置示例如下:
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
dispatcher:
field: type
rules:
- value: http
table_suffix: http
pipeline: http
- value: db
table_suffix: db
transform:
- fields:
- string_field_a
- string_field_b
type: string
# 写入的数据必须包含 timestamp 字段
- fields:
- reqTimeSec, req_time_sec
# epoch 是特殊字段类型,必须指定精度
type: epoch, ms
index: timestamp
table_suffix: _${string_field_a}
Processor
Processor 用于对 log 数据进行预处理,其配置位于 YAML 文件中的 processors
字段下。
Pipeline 会按照多个 Processor 的顺序依次加工数据,每个 Processor 都依赖于上一个 Processor 处理的结果。
Processor 由一个 name 和多个配置组成,不同类型的 Processor 配置有不同的字段。
我们目前内置了以下几种 Processor:
date
: 解析格式化的时间字符串字段,例如2024-07-12T16:18:53.048
。epoch
: 解析数字时间戳字段,例如1720772378893
。decolorize
: 移除日志数据中的 ANSI 颜色代码。dissect
: 对 log 数据字段进行拆分。gsub
: 对 log 数据字段进行替换。join
: 对 log 中的 array 类型字段进行合并。letter
: 对 log 数据字段进行字母转换。regex
: 对 log 数据字段进行正则匹配。urlencoding
: 对 log 数据字段进行 URL 编解码。csv
: 对 log 数据字段进行 CSV 解析。json_path
: 从 JSON 数据中提取字段。json_parse
: 将一个字段解析成 JSON 对象。simple_extract
: 使用简单的 key 从 JSON 数据中提取字段。digest
: 提取日志消息模板。
大多数 Processor 都有 field
或 fields
字段,用于指定需要被处理的字段。大部分 Processor 处理完成后会覆盖掉原先的 field。如果你不想影响到原数据中的对应字段,我们可以把结果输出到其他字段来避免覆盖。
当字段名称包含 ,
时,该字段将被重命名。例如,reqTimeSec, req_time_sec
表示将 reqTimeSec
字段重命名为 req_time_sec
,处理完成后的数据将写入中间状态的 req_time_sec
字段中。原始的 reqTimeSec
字段不受影响。如果某些 Processor 不支持字段重命名,则重命名字段名称将被忽略,并将在文档中注明。
例如:
processors:
- letter:
fields:
- message, message_upper
method: upper
ignore_missing: true
message
字段将被转换为大写并存储在 message_upper
字段中。
date
date
Processor 用于解析时间字段。示例配置如下:
processors:
- date:
fields:
- time
formats:
- '%Y-%m-%d %H:%M:%S%.3f'
ignore_missing: true
timezone: 'Asia/Shanghai'
如上所示,date
Processor 的配置包含以下字段:
fields
: 需要解析的时间字段名列表。formats
: 时间格式化字符串,支持多个时间格式化字符串。按照提供的顺序尝试解析,直到解析成功。你可以在这里找到格式化的语法说明。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。timezone
:时区。使用tz_database 中的时区标识符来指定时区。默认为UTC
。
epoch
epoch
Processor 用于解析时间戳字段,示例配置如下:
processors:
- epoch:
fields:
- reqTimeSec
resolution: millisecond
ignore_missing: true
如上所示,epoch
Processor 的配置包含以下字段:
fields
: 需要解析的时间戳字段名列表。resolution
: 时间戳精度,支持s
,sec
,second
,ms
,millisecond
,milli
,us
,microsecond
,micro
,ns
,nanosecond
,nano
。默认为ms
。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
decolorize
decolorize
Processor 用于移除日志数据中的 ANSI 颜色代码。示例配置如下:
processors:
- decolorize:
fields:
- message
如上所示,decolorize
Processor 的配置包含以下字段:
fields
: 需要移除颜色代码的字段名列表。
dissect
dissect
Processor 用于对 log 数据字段进行拆分,示例配置如下:
processors:
- dissect:
fields:
- message
patterns:
- '%{key1} %{key2}'
ignore_missing: true
append_separator: '-'
如上所示,dissect
Processor 的配置包含以下字段:
fields
: 需要拆分的字段名列表。不支持字段重命名。patterns
: 拆分的 dissect 模式。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。append_separator
: 对于多个追加到一起的字段,指定连接符。默认是一个空字符串。
Dissect 模式
和 Logstash 的 Dissect 模式类似,Dissect 模式由 %{key}
组成,其中 %{key}
为一个字段名。例如:
"%{key1} %{key2} %{+key3} %{+key4/2} %{key5->} %{?key6}"
Dissect 修饰符
Dissect 模式支持以下修饰符:
修饰符 | 说明 | 示例 |
---|---|---|
+ | 将两个或多个字段追加到一起 | %{+key} %{+key} |
+ 和 /n | 按照指定的顺序将两个或多个字段追加到一起 | %{+key/2} %{+key/1} |
-> | 忽略右侧的任何重复字符 | %{key1->} %{key2->} |
? | 忽略匹配的值 | %{?key} |
dissect
示例
例如,对于以下 log 数据:
"key1 key2 key3 key4 key5 key6"
使用以下 Dissect 模式:
"%{key1} %{key2} %{+key3} %{+key3/2} %{key5->} %{?key6}"
将得到以下结果:
{
"key1": "key1",
"key2": "key2",
"key3": "key3 key4",
"key5": "key5"
}
gsub
gsub
Processor 用于对 log 数据字段进行替换,示例配置如下:
processors:
- gsub:
fields:
- message
pattern: 'old'
replacement: 'new'
ignore_missing: true
如上所示,gsub
Processor 的配置包含以下字段:
fields
: 需要替换的字段名列表。pattern
: 需要替换的字符串。支持正则表达式。replacement
: 替换后的字符串。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
join
join
Processor 用于对 log 中的 Array 类型字段进行合并,示例配置如下:
processors:
- join:
fields:
- message
separator: ','
ignore_missing: true
如上所示,join
Processor 的配置包含以下字段:
fields
: 需要合并的字段名列表。注意,这里每行字段的值需要是 Array 类型,每行字段会单独合并自己数组内的值,所有行的字段不会合并到一起。separator
: 合并后的分隔符。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
join
示例
例如,对于以下 log 数据:
{
"message": ["a", "b", "c"]
}
使用以下配置:
processors:
- join:
fields:
- message
separator: ','
将得到以下结果:
{
"message": "a,b,c"
}
letter
letter
Processor 用于对 log 数据字段进行字母转换,示例配置如下:
processors:
- letter:
fields:
- message
method: upper
ignore_missing: true
如上所示,letter
Processor 的配置包含以下字段:
fields
: 需要转换的字段名列表。method
: 转换方法,支持upper
,lower
,capital
。默认为lower
。注意capital
只会将第一个字母转换为大写。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
regex
regex
Processor 用于对 log 数据字段进行正则匹配,示例配置如下:
processors:
- regex:
fields:
- message
patterns:
- ':(?<id>[0-9])'
ignore_missing: true
如上所示,regex
Processor 的配置包含以下字段:
fields
: 需要匹配的字段名列表。如果重命名了字段,重命名后的字段名将与pattern
中的命名捕获组名进行拼接。pattern
: 要进行匹配的正则表达式,需要使用命名捕获组才可以从对应字段中取出对应数据。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
regex 命名捕获组的规则
regex
Processor 支持使用 (?<group-name>...)
的语法来命名捕获组,最终将数据处理为这种形式:
{
"<field-name>_<group-name>": "<value>"
}
例如 regex
Processor 中 field 填写的字段名为 message
,对应的内容为 "[ERROR] error message"
,
你可以将 pattern 设置为 \[(?<level>[A-Z]+)\] (?<content>.+)
,
最终数据会被处理为:
{
"message_level": "ERROR",
"message_content": "error message"
}
urlencoding
urlencoding
Processor 用于对 log 数据字段进行 URL 编码,示例配置如下:
processors:
- urlencoding:
fields:
- string_field_a
- string_field_b
method: decode
ignore_missing: true
如上所示,urlencoding
Processor 的配置包含以下字段:
fields
: 需要编码的字段名列表。method
: 编码方法,支持encode
,decode
。默认为encode
。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
csv
csv
Processor 用于对 log 数据中没有携带 header 的 CSV 类型字段解析,示例配置如下:
processors:
- csv:
fields:
- message
separator: ','
quote: '"'
trim: true
ignore_missing: true
如上所示,csv
Processor 的配置包含以下字段:
fields
: 需要解析的字段名列表。separator
: 分隔符。quote
: 引号。trim
: 是否去除空格。默认为false
。ignore_missing
: 忽略字段不存在的情况。默认为false
。如果字段不存在,并且此配置为 false,则会抛出异常。
json_path
(实验性)
注意:json_path
处理器目前处于实验阶段,可能会有所变动。
json_path
处理器用于从 JSON 数据中提取字段。以下是一个配置示例:
processors:
- json_path:
fields:
- complex_object
json_path: "$.shop.orders[?(@.active)].id"
ignore_missing: true
result_index: 1
在上述示例中,json_path
processor 的配置包括以下字段:
fields
:要提取的字段名称列表。json_path
:要提取的 JSON 路径。ignore_missing
:忽略字段缺失的情况。默认为false
。如果字段缺失且此配置设置为false
,将抛出异常。result_index
:指定提取数组中要用作结果值的下标。默认情况下,包含所有值。Processor 提取的结果值是包含 path 中所有值的数组。如果指定了索引,将使用提取数组中对应的下标的值作为最终结果。
JSON 路径语法
JSON 路径语法基于 jsonpath-rust 库。
在此阶段,我们仅推荐使用一些简单的字段提取操作,以便将嵌套字段提取到顶层。
json_path
示例
例如,给定以下日志数据:
{
"product_object": {
"hello": "world"
},
"product_array": [
"hello",
"world"
],
"complex_object": {
"shop": {
"orders": [
{
"id": 1,
"active": true
},
{
"id": 2
},
{
"id": 3
},
{
"id": 4,
"active": true
}
]
}
}
}
使用以下配置:
processors:
- json_path:
fields:
- product_object, object_target
json_path: "$.hello"
result_index: 0
- json_path:
fields:
- product_array, array_target
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_1
json_path: "$.shop.orders[?(@.active)].id"
- json_path:
fields:
- complex_target_1, complex_target_2
json_path: "$.[1]"
result_index: 0
- json_path:
fields:
- complex_object, complex_target_3
json_path: "$.shop.orders[?(@.active)].id"
result_index: 1
transform:
- fields:
- object_target
- array_target
type: string
- fields:
- complex_target_3
- complex_target_2
type: uint32
- fields:
- complex_target_1
type: json
结果将是:
{
"object_target": "world",
"array_target": "world",
"complex_target_3": 4,
"complex_target_2": 4,
"complex_target_1": [1, 4]
}
json_parse
json_parse
,如其名字所示,将一个字符串解析成一个 JSON 对象。以下是一份示例配置:
processors:
- json_parse:
fields:
- complex_object, target_field
ignore_missing: true
在上述示例中,json_parse
处理器的配置包含以下字段:
fields
:对于每个字段,第一个 key 表示输入字符串中的 key,第二个 key 表示输出的 key 名称。如果不填写第二个 key,则默认使用第一个 key 名作为输出名,也就是替换第一个 key 中的值。上面的示例表示解析complex_object
并将输出放入target_field
中。ignore_missing
:忽略字段不存在的情况。默认为false
。如果字段不存在且此配置为false
,将抛出异常。
json_parse
example
例如,给定以下日志数据:
{
"product_object": "{\"hello\":\"world\"}",
}
使用以下配置:
processors:
- json_parse:
fields:
- product_object
结果将是:
{
"product_object": {
"hello": "world"
}
}
simple_extract
虽然 json_path
处理器能够使用复杂表达式从 JSON 对象中提取字段,但它相对较慢且成本较高。simple_extract
处理器提供了一种简单的方法,仅使用键名来提取字段。以下是示例配置:
processors:
- simple_extract:
fields:
- complex_object, target_field
key: "shop.name"
ignore_missing: true
在上述示例中,simple_extract
处理器的配置包含以下字段:
fields
:对于每个字段,第一个键表示上下文中的输入 JSON 对象,第二个键表示输出键名。上面的示例表示从complex_object
中提取数据并将输出放入target_field
中。key
:提取键,使用x.x
格式,每个.
表示一个新的嵌套层。ignore_missing
:忽略字段不存在的情况。默认为false
。如果字段不存在且此配置为false
,将抛出异常。
simple_extract
示例
例如,给定以下日志数据:
{
"product_object": {
"hello": "world"
},
"product_array": [
"hello",
"world"
],
"complex_object": {
"shop": {
"name": "some_shop_name"
}
}
}
使用以下配置:
processors:
- simple_extract:
fields:
- complex_object, shop_name
key: "shop.name"
transform:
- fields:
- shop_name
type: string
结果将是:
{
"shop_name": "some_shop_name"
}
digest
digest
处理器用于从日志内容中提取日志模板,它通过识别并移除可变内容(如数字、UUID、IP 地址、引号中的内容和括号中的文本等)来实现。提取出的模板可用于日志的分类和分析。配置示例如下:
processors:
- digest:
fields:
- message
presets:
- numbers
- uuid
- ip
- quoted
- bracketed
regex:
- "foobar"
ignore_missing: true
在上述示例中,digest
处理器的配置包含以下字段:
fields
:要进行摘要处理的字段名列表。处理后的结果将存储在带有_digest
后缀的新字段中。presets
:要移除的预设模式列表。支持以下模式:numbers
:匹配数字序列uuid
:匹配 UUID 字符串,如123e4567-e89b-12d3-a456-426614174000
ip
:匹配 IPv4/IPv6 地址(可选带端口号)quoted
:匹配单引号/双引号内的文本(包括各种 Unicode 引号)bracketed
:匹配各种类型括号内的文本(包括各种 Unicode 括号)
regex
:要移除的自定义正则表达式列表ignore_missing
:是否忽略字段不存在的情况。默认为false
。如果字段不存在且此配置为false
,将抛出异常。
digest
示例
例如,给定以下日志数据:
{
"message": "User 'john.doe' from [192.168.1.1] accessed resource 12345 with UUID 123e4567-e89b-12d3-a456-426614174000"
}
使用以下配置:
processors:
- digest:
fields:
- message
presets:
- numbers
- uuid
- ip
- quoted
- bracketed
结果将是:
{
"message": "User 'john.doe' from [192.168.1.1] accessed resource 12345 with UUID 123e4567-e89b-12d3-a456-426614174000",
"message_digest": "User from accessed resource with UUID "
}
提取的模板可用于对相似的日志消息进行分组或分析日志模式,即使可变内容不同。例如,以下所有日志消息都会生成相同的模板:
User 'alice' from [10.0.0.1] accessed resource 54321 with UUID 987fbc97-4bed-5078-9141-2791ba07c9f3
User 'bob' from [2001:0db8::1] accessed resource 98765 with UUID 550e8400-e29b-41d4-a716-446655440000
Transform
Transform 用于对 log 数据进行转换,并且指定在数据库表中列的索引信息。其配置位于 YAML 文件中的 transform
字段下。
从 v0.15
开始,GreptimeDB 新增了一种自动 transform 模式用于简化配置。具体详情见下。
Transform 由一个或多个配置组成,每个配置包含以下字段:
fields
: 需要转换的字段名列表type
: 转换类型index
: 索引类型(可选)tag
: 设置列为 tag(可选)on_failure
: 转换失败时的处理方式(可选)default
: 默认值(可选)
自动 transform
如果 pipeline 的配置中没有 transform,那么 pipeline 引擎会尝试为上下文中的字段自动推导类型并将其保存到数据库中,该行为类似 identity
pipeline 的效果。
当在 GreptimeDB 中创建表时,必须指定一个时间索引列。在这个场景中,pipeline 引擎会尝试从上下文中寻找一个 timestamp
类型的字段,并将其设置成时间索引列。timestamp
类型的字段是 date
或者 epoch
processor 的产物。所以在 processors 声明中,必须存在一个
date
或者 epoch
processor。同时,只允许存在一个 timestamp
类型的字段,多个 timestamp
字段的情况下会因无法判断在哪一列上设置时间索引而报错。
例如,以下 pipeline 配置现在是有效的。
processors:
- dissect:
fields:
- message
patterns:
- '%{ip_address} - %{username} [%{timestamp}] "%{http_method} %{request_line} %{protocol}" %{status_code} %{response_size}'
ignore_missing: true
- date:
fields:
- timestamp
formats:
- "%d/%b/%Y:%H:%M:%S %z"
其产生的表结构如下:
mysql> desc auto_trans;
+---------------+---------------------+------+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+---------------+---------------------+------+------+---------+---------------+
| timestamp | TimestampNanosecond | PRI | NO | | TIMESTAMP |
| host | String | | YES | | FIELD |
| http_method | String | | YES | | FIELD |
| ip_address | String | | YES | | FIELD |
| message | String | | YES | | FIELD |
| protocol | String | | YES | | FIELD |
| request_line | String | | YES | | FIELD |
| response_size | String | | YES | | FIELD |
| service | String | | YES | | FIELD |
| source_type | String | | YES | | FIELD |
| status_code | String | | YES | | FIELD |
| username | String | | YES | | FIELD |
+---------------+---------------------+------+------+---------+---------------+
12 rows in set (0.03 sec)
可以看到所有的字段都被保存下来了,包括原始的 message
字段。注意所有的字段都被保存成 String
类型,timestamp
字段自动被设置成时间索引列。
fields
字段
每个字段名都是一个字符串,当字段名称包含 ,
时,会进行字段重命名。例如,reqTimeSec, req_time_sec
表示将 reqTimeSec
字段重命名为 req_time_sec
,
最终数据将被写入到 GreptimeDB 的 req_time_sec
列。
type
字段
GreptimeDB 目前内置了以下几种转换类型:
int8
,int16
,int32
,int64
: 整数类型。uint8
,uint16
,uint32
,uint64
: 无符号整数类型。float32
,float64
: 浮点数类型。string
: 字符串类型。time
: 时间类型。将被转换为 GreptimeDBtimestamp(9)
类型。epoch
: 时间戳类型。将被转换为 GreptimeDBtimestamp(n)
类型。n 为时间戳精度,n 的值视 epoch 精度而定。当精度为s
时,n 为 0;当精度为ms
时,n 为 3;当精度为us
时,n 为 6;当精度为ns
时,n 为 9。
如果字段在转换过程中获得了非法值,Pipeline 将会抛出异常。例如将一个字符串 abc
转换为整数时,由于该字符串不是一个合法的整数,Pipeline 将会抛出异常。
index
字段
Pipeline
会将处理后的数据写入到 GreptimeDB 自动创建的数据表中。为了提高查询效率,GreptimeDB 会为表中的某些列创建索引。index
字段用于指定哪些字段需要被索引。关于 GreptimeDB 的索引类型,请参考数据索引文档。
GreptimeDB 支持以下四种字段的索引类型:
timestamp
: 用于指定某列是时间索引列inverted
: 用于指定某列使用 inverted 类型的索引(倒排索引)fulltext
: 用于指定某列使用 fulltext 类型的索引(全文索引),该列需要是字符串类型skipping
: 用于指定某列使用 skipping 类型的索引(跳数索引),该列需要是字符串类型
不提供 index
字段时,GreptimeDB 将不会在该字段上建立索引。
在 GreptimeDB 中,一张表里必须包含一个 timestamp
类型的列作为该表的时间索引列,因此一个 Pipeline 有且只有一个时间索引列。
时间戳列
通过 index: timestamp
指定哪个字段是时间索引列,写法请参考下方的 Transform 示例。
Inverted 索引
通过 index: inverted
指定在哪个列上建立倒排索引,写法请参考下方的 Transform 示例。
Fulltext 索引
通过 index: fulltext
指定在哪个列上建立全文索引,该索引可大大提升 日志搜索 的性能,写法请参考下方的 Transform 示例。
Skipping 索引
通过 index: skipping
指定在哪个列上建立跳数索引,该索引只需少量存储空间的索引文件即可以加速在高基数列上的查询,写法请参考下方的 Transform 示例。
tag
字段
Pipeline
支持手动指定 tag 列。关于 tag 列的更多信息,请参考数据模型文档。写法请参考下方的 Transform 示例。
on_failure
字段
on_failure
字段用于指定转换失败时的处理方式,支持以下几种方式:
ignore
: 忽略转换失败的字段,不写入数据库。default
: 写入默认值。默认值由default
字段指定。
default
字段
default
字段用于指定转换失败时的默认值。
Transform 示例
例如,对于以下 log 数据:
{
"num_field_a": "3",
"string_field_a": "john",
"string_field_b": "It was snowing when he was born.",
"time_field_a": 1625760000
}
使用以下配置:
transform:
- fields:
- string_field_a, name
type: string
index: skipping
tag: true
- fields:
- num_field_a, age
type: int32
index: inverted
- fields:
- string_field_b, description
type: string
index: fulltext
- fields:
- time_field_a, born_time
type: epoch, s
index: timestamp
将得到以下结果:
{
"name": "john",
"age": 3,
"description": "It was snowing when he was born.",
"born_time": 2021-07-08 16:00:00
}
Dispatcher
Dispatcher 允许用户将数据路由到其他 Pipeline 上。这是为了应对当多种日志类型共享 单一来源且需要存储在不同结构的单独表中。
配置例子如下:
dispatcher:
field: type
rules:
- value: http
table_suffix: http
pipeline: http
- value: db
table_suffix: db
Dispatcher 在 processor 之后执行。当匹配到相应的规则时,下一个 pipeline 将被执行。
你可以指定路由数据所依据的字段名 field
,并指定路由规则 rules
。假如 field
字段匹配到规则中的 value
,数据将被路由到 pipeline
。如果规则中没有指定
pipeline
,将会根据当前的数据结构推断表结构。在上面的例子里,如果用户输入的数据
中 type
字段的值为 http
,我们将把数据路由给名为 http
的 pipeline 执行。如
果 type
字段的值为 db
,我们将用当前数据的结构作为表结构存储。
写入的目标表名由 table_suffix
指定,这个后缀将和请求输入的 table
参数及下划
线组合形成最终的表名。例如,请求的表名叫做 applogs
,当匹配到上面例子中的
http
规则时,最终的表名叫做 applogs_http
。
如果没有规则匹配到,数据将执行当前 pipeline 中定一个 transform 规则。
Table suffix
此实验性功能可能存在预期外的行为,其功能未来可能发生变化。
在一些场景下,你可能需要将写入的日志数据基于输入的字段值保存到不同表上。 比如你可能希望按照产生的应用名将日志保存到不同的表上,在表名上添加这个应用名的后缀。
一个配置示例如下:
table_suffix: _${app_name}
语法非常简单: 使用 ${}
来引用 pipeline 执行上下文中的变量。
该变量可以是输入数据中直接存在的,也可以是前序处理流程中产生的。
变量替换完成之后,整个字符串会被添加到输入的表名后。
注意:
- 引用的变量必须是一个整数或者字符串类型的数据
- 如果在执行过程中遇到任何错误(例如变量不存在或者无效数据类型),输入的表名会被作为最终表名
下面举一个例子。以下是输入数据:
[
{"type": "db"},
{"type": "http"},
{"t": "test"}
]
输入的表名为 persist_app
,pipeline 配置如下:
table_suffix: _${type}
这三行输入数据会被写入到三张不同的表中:
persist_app_db
persist_app_http
persist_app
, 因为输入的数据中并没有type
字段,所以使用了默认的表名