跳至主内容

流水线

流水线是 安全数据湖 日志消息处理系统的核心组件,为评估、修改和路由传入数据提供结构化框架。它们定义了消息摄取后应用的处理步骤序列,确保日志数据得到一致、高效且定制化的处理。

每条流水线由一系列 规则 组成,这些规则可关联至 数据流 ,并且可以链接到一个或多个数据流。这种连接方式使您能够精确控制特定消息的处理方式和时机,从而实现对数据丰富化、规范化和路由的细粒度管控。

提取器

提取器 安全数据湖 的遗留功能,最初用于在日志消息摄入时进行处理和解析。建议改用更健壮、可定制的管道功能进行集中式消息处理。管道具有灵活性,支持条件逻辑,并能跨多个输入源运作。

核心概念

管道

管道是由多个规则按阶段组织而成的集合。当绑定到数据流时,进入该流的消息会按照预定义的阶段顺序通过所有连接的管道进行处理。

管道规则

管道规则定义了消息处理逻辑。

它们可以:

  • 将消息路由到不同数据流

  • 通过添加或修改字段实现数据丰富化

  • 转换消息内容

  • 规范化消息格式以实现统一搜索与分析

一组协同运作的关联规则构成完整的管道工作流。

函数

函数 是管道规则的基础构件。每个函数执行特定操作(如解析文本、检查字段值或更改消息内容),并可接受参数控制其行为。 函数返回的结果会影响后续规则对消息的处理方式。

阶段

管道划分为多个阶段,每个阶段包含一个或多个规则。阶段按数字顺序依次执行。

所有相同优先级的阶段会在所有连接的管道中并行执行。这种结构支持构建多步骤工作流,例如:在一个阶段解析消息,在另一阶段进行数据丰富化,最终在最后阶段完成路由。

数据流

每个管道必须至少连接一个 数据流 ,这决定了管道处理哪些消息。

进入数据流的消息会触发所有连接管道的执行。详见数据流说明文档。

管道规则逻辑

安全数据湖中的管道 是由管道规则构建的,这些规则定义了日志消息在被索引或存储之前如何被检查、转换和路由。 这些规则使用专用的领域专用语言(DSL),该语言提供了受控且易读的语法来定义处理逻辑,同时保持强大的运行时性能。

每条规则结合了一个条件和一个 动作 :

  • 条件决定规则何时适用。

  • 动作指定当条件满足时发生的操作。

理解数据类型对于编写规则至关重要。数据类型定义了字段所保存的值的种类(例如字符串、数字或IP地址)以及该值在规则中如何被操作。

管道规则是通过使用函数构建的——这些预定义的方法执行特定任务,如转换数据类型、操作字符串、解析JSON或检索 查找表 数据。 安全数据湖 包含了广泛的内置函数,帮助你有效地丰富、转换和管理日志数据。

规则可以通过 规则构建器 界面交互式创建和测试,或者由高级用户在源码编辑器中手动编写。

示例管道

下面的示例展示了管道及其规则的内部结构:

pipeline "My new pipeline"
stage 1 match all
  rule "has firewall fields";
  rule "from firewall subnet";
stage 2 match either
  rule "geocode IPs";
  rule "anonymize source IPs";
end

该管道定义了两个阶段:

  • 阶段1仅在所有列出的规则都为真时执行。

  • 阶段2如果其中任一规则匹配,则执行(类似于或条件)。

阶段按升序运行,每个阶段可以引用可重用的规则。这允许模块化设计:像 hasfirewallfields 这样的规则可以在多个管道中共享,避免重复。

示例规则

以下是管道中引用的两个示例规则:

规则1

rule "has firewall fields"
when
    has_field("src_ip") && has_field("dst_ip")
then
end

规则2

rule "来自防火墙子网"
when
    cidr_match("10.10.10.0/24", to_ip($message.gl2_remote_ip))
then
end

两条规则均使用内置函数定义条件:

  • has_field() 检查是否存在特定消息字段。

  • cidr_match() 评估IP地址是否属于指定子网范围。

to_ip() 转换确保值被解析为IP地址而非字符串,体现了 安全数据湖 对规则验证的严格类型约束。

这些规则不包含动作( then 为空),因其仅用于控制管道流程。

条件

规则的 when 子句是针对每条消息评估的布尔表达式。

支持逻辑运算符 AND( && )、OR( || )和 NOT( ! ),以及比较运算符如 < , <= , > , >= , == ,以及 != .

例如:

has_field("src_ip") && cidr_match("10.0.0.0/8", to_ip($message.src_ip))

若条件引用了不存在的函数,则自动判定为 false .

比较字段时,需确保两者类型相同,例如:

to_string($message.src_ip) == to_string($message.dst_ip)

操作

当条件判定为 then 时, true .

操作可以是:

  • 函数调用,如 set_field("type","firewall_log");

  • 变量赋值,如 letsubnet=to_string($message.network);

变量可用于存储临时值、避免重复计算并提升规则可读性。

保留字

规则语言中的特定标记为保留字,不可用作变量名,包括:

  • 所有

  • Either

  • Pass

  • And

  • Or

  • Not

  • Pipeline

  • Rule

  • During

  • Stage

  • When

  • Then

  • End

  • Let

  • Match

例如:

let match = regex(a,b);

将因match是保留字而失败

数据类型

安全数据湖 在管道规则中强制类型安全以防止无效操作,支持以下内置数据类型:

数据类型

描述

字符串

UTF-8文本值

双精度浮点数

浮点数(Java Double类型)

长整型

整型数字(Java Long类型)

布尔型

真或假值

void

无返回值的函数

ip

IP地址(InetAddress的子集)

插件可定义额外类型。以 to_ 为前缀的转换函数(例如 to_string()

, to_ip() , to_long() )确保类型正确处理。完整函数列表请参阅 函数参考手册 .

注意

在比较或使用函数前,始终将消息字段转换为正确类型。 例如:

set_field("timestamp", to_string(`$message.@extracted_timestamp`));

构建流水线规则

流水线由规则定义,这些规则决定消息通过 安全数据湖 时的处理方式。每条规则通过条件与动作的组合,使您能基于特定标准过滤、丰富、转换或路由日志数据。

创建规则时,需使用规则构建器或源代码编辑器定义逻辑,将“when”条件与“then”动作结合,精确描述消息处理方式。规则创建后,可加入流水线、分阶段组织并连接至数据流——实现从摄取到存储的灵活自动化消息处理控制。

本文概述创建和管理流水线规则的流程。

配置消息处理器

开始构建流水线规则前,请确保 消息处理器 已启用并正确配置:

  1. G 前往 系统 > 配置 .

  2. 选择 消息处理器 .

  3. 选择 编辑配置 并启用 管道处理器 勾选其旁边的复选框。

  4. 拖动 管道处理器 使其位于 消息过滤链 之后。使用左侧的六个点进行拖动。

  5. 点击 更新配置 .

创建和管理规则

规则可通过 规则构建器 源代码编辑器 .

规则构建器(默认视图)提供了一种引导式的可视化规则创建方式。

要切换至手动编辑模式,请从创建菜单中选择 使用源代码编辑器

警告

您可以将规则从规则构建器转换为源代码编辑器,但无法逆向转换。

使用规则构建器创建规则

规则构建器提供了一种可视化、结构化的方法,可直接在 安全数据湖 界面中编写规则。

每条规则遵循简单的“当→则”模式:

  • 用于定义触发规则的条件。

  • 用于定义满足条件时执行的操作。

两个输入框均支持可搜索的下拉菜单。输入函数名的前几个字母会显示建议和简短描述。完整函数列表请参阅《函数文档》。

使用规则构建器创建规则的步骤如下:

  1. 前往 系统 > 管道 > 管理规则 .

  2. 选择 创建规则 .

  3. 创建when语句。

  4. (可选)您可以添加其他语句,并使用 运算符进行组合,这些运算符可从 部分的右上角区域选择。

  5. 创建一个 然后 语句以指定操作。

如果“然后”语句产生一个值,输出变量将自动显示,并可在后续语句中重复使用。

注意

规则可立即在 规则模拟 模块中进行测试。

使用源代码编辑器创建规则

您也可以手动编写规则,在 源代码编辑器 中使用“当”和“然后”语句。此视图支持完整的语法编辑,并包含函数及其描述的快速参考列表。

要使用源代码编辑器创建规则,请遵循以下规则:

  1. 转到 系统 > 管道 > 管理规则 .

  2. 选择 创建规则 .

  3. 选择 使用源代码编辑器 (位于页面右上角)。

  4. 配置规则。

    注意

    请参阅 管道规则逻辑 以获取语法详情。

  5. 选择 创建规则 .

与规则构建器类似,您可以在保存前通过规则模拟模块验证规则。

模拟管道规则

模拟功能允许您在部署前测试规则。您可以模拟完整消息或单个字段。在模拟框中输入原始消息字符串、键值对或JSON有效载荷。

模拟器会逐步显示分配的输出变量和处理结果。

提示

每条规则会保存最后使用的消息,因此随时可用于模拟。

运行模拟的步骤如下:

  1. 前往 系统 > 管道 > 模拟器 .

  2. 选择“运行规则模拟”。

  3. 输入示例消息。

  4. 查看处理后的输出结果。

  5. 如有需要,可重置或调整规则后重新运行。

管理流水线

创建规则后,您可将其组合成处理并丰富消息的流水线。请前往 系统 > 流水线 > 管理流水线 以创建、编辑或删除流水线。

每个流水线包含一个或多个定义执行顺序和逻辑的阶段。

创建流水线

新建流水线需遵循以下步骤:

  1. 前往 系统 > 流水线 > 管理流水线 .

  2. 点击屏幕右上角的 添加新流水线 按钮。

  3. 为流水线输入描述性名称和说明,并选择???

  4. 编辑连接 下方的 流水线连接 区域中

    选择 编辑连接 窗口已显示。

  5. 字段下,选择要附加的流。

    注意

    管道仅作用于其所连接流中的消息。多个管道可处理同一流;其规则按阶段优先级运行。

    提示

    所有消息 ”流是所有传入数据的默认入口点,也是处理路由、过滤或字段丰富化的通用管道的理想位置。

    选择后,它们将被添加到菜单下方的列表中。您可以选择 移除 将其从列表中删除。

  6. 选择 添加新阶段 并配置阶段:

    1. 阶段 下输入阶段优先级,该值决定管道在序列中的执行顺序。此数字可为任意整数,数值较小的阶段优先运行。

    2. 选择如何处理后续阶段的规则:

      • 本阶段所有规则均匹配消息 - 仅当满足所有条件时继续下一阶段。

      • 本阶段至少一条规则匹配消息 - 满足任一条件即继续下一阶段

      • 本阶段无规则或任意数量规则匹配 - 仅当不满足任何条件时继续下一阶段。

    3. 在阶段规则下,选择要应用的规则。

    4. 选择 添加阶段 保存信息。

  7. 如需添加更多阶段,请继续操作。

    注意

    每个新建阶段会在 流水线 菜单下新增一个版块。点击 编辑 修改阶段详情,或点击 删除 移除该阶段。

添加所有阶段后,流水线即构建完成,将显示在流水线页面。连接至数据流后,系统将根据您定义的规则与逻辑自动开始处理传入消息。

数据流测试与流水线模拟

使用流水线模拟器预览消息在当前流水线设置下的处理过程。测试数据流步骤如下:

  1. 进入 系统 > 流水线 > 模拟器 .

  2. 数据流 下选择待测数据流。

  3. 原始消息 中输入与传入日志格式相同的原始样本消息(例如GELF格式消息)。

  4. (可选)指定源IP、输入类型及编解码器(日志消息解析机制)。

模拟执行后将显示:

  • 变更摘要 ——列出被修改、新增或删除的字段。

  • 结果预览 – 显示完整处理后的消息。

  • 模拟追踪 – 详细记录执行的规则和管道及其耗时。

编辑与管道阶段

所有管道均显示在 系统 > 管道 > 管理管道 页面下。针对每个管道,您可选择 删除 移除管道,或 编辑 修改其配置。

应用场景

本文介绍在 安全数据湖 中创建和应用管道规则的实际用例。这些示例演示了如何过滤无效日志、丰富消息数据,并将消息路由至特定流或告警系统。参考这些场景来设计和实施高效管道规则,以优化日志数据的处理与分析流程。

规则

条件示例

动作示例

规则语法

匿名化处理

管道规则可在消息存储或转发前对敏感数据进行编辑或删除。

通过掩码IP地址、用户名或个人标识符等信息,确保符合隐私标准。

检查是否存在 source_ip 字段。

移除 source_ip 字段以消除敏感数据。

规则 "屏蔽敏感信息"
当
    存在字段("source_ip")
则
    移除单个字段("source_ip");
结束

面包屑

面包屑规则为消息添加元数据,以便跨系统或阶段追踪其流向。

这些规则常用于调试、标记或跟踪消息处理过程。

留空。这将确保规则适用于所有传入消息。

添加或更新字段 demo 为静态值。

规则 "设置演示字段"
当
    真
则
    设置字段("rule_demo", "test");
结束

过滤

过滤规则可丢弃非必要消息,减少数据摄入量和许可证使用。

检查 testing 字段是否存在。

完全丢弃该消息,防止其被存储或进一步处理。

规则 "丢弃测试消息"
当
    存在字段("testing")
则
    丢弃消息();
结束

修改

修改规则可变更消息内容,例如重新格式化时间戳或更新字段值。

检查消息是否包含 event_time 字段。

将时间戳从UTC转换为英国时间,并写入名为 event_time_uk .

规则 "将event_time转换为英国时区"
当
    存在字段("event_time")
则
    令 event_time_date = 解析日期(
    值: 转字符串($消息.event_time),
    模式: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", // 根据需要调整此模式
    时区: "UTC"
    );
    令 event_time_uk = 格式化日期(
    值: event_time_date,
    日期格式: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
    时区: "Europe/London"
    );
    设置字段("event_time_uk", event_time_uk
    );
结束

丰富

丰富规则通过添加额外上下文或更新现有数据值来增强消息。

检查字段 Src_ip_geo_country 是否存在且等于 美国 .

将该字段值更新为 UniSt .

规则 "SrcCountryUnitedStates"
当
    存在字段("Src_ip_geo_country") &&
    转字符串($消息.Src_ip_geo_country) == "US"
则
    设置字段(
        字段: "Src_ip_geo_country",
        值: "UniSt",
        清除字段: false
    );
结束

路由

路由规则将特定消息发送至另一流,并可选地从当前流中移除。

提示

在规则中引用目标流前,请预先创建该目标流。

检查 gl2_remote_ip 字段是否存在且匹配特定字符串值。

将消息路由至目标流( 我的第一个流 )并从默认流中移除。

规则 "路由消息至流"
当
    存在字段("gl2_remote_ip") &&
    转字符串($消息.gl2_remote_ip) == "66914166ac1d1568bad817f3"
则
    路由至流(
        名称: "我的第一个流",
        从默认流移除: true
    );
结束

管道函数

函数是管道规则的构建模块。每个函数都是预定义方法,在日志消息流经 安全数据湖 处理管道时对其执行特定操作。

函数可接受一个或多个参数,并返回决定消息如何转换、丰富或路由的输出结果。通过在条件和动作中组合函数,您能定义强大的处理逻辑,使消息处理符合组织需求。

完整支持的 安全数据湖 函数列表、描述及语法示例,请参阅 函数参考 .

语法

安全数据湖中的 管道函数 采用Java实现并设计为可插拔架构,便于您轻松扩展平台的处理能力。

从概念上讲,函数接收参数(如当前消息上下文)并返回值。参数和返回值的类型决定了该函数在规则中的适用场景。 安全数据湖 会自动验证这些类型,确保所有规则在逻辑和语法上均正确无误。

函数参数可通过命名键值对或位置顺序传递,但需确保所有可选参数声明在最后并最后处理。

Java数据类型

管道规则在构建查询或执行计算时可以使用某些Java数据类型。这仅限于通过 GET 函数查询的类型。

例如,您可以使用 .millis 属性(属于 DateTime Period 对象)来获取以毫秒为单位的时间值。

这使您能够执行精确的时间计算,例如测量消息相对于当前时间的存在时长。

规则 "时间差计算器(毫秒)"
当
    条件为真
则
    定义时间差 =
        转换为长整型(
            解析日期(
                值: 转换为字符串(当前时间(时区: "欧洲/柏林")),
                格式: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
                地区: "de_DE"
            ).毫秒
        )
        -
        转换为长整型(
            解析日期(
                值: 转换为字符串($消息.时间戳),
                格式: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
                地区: "de_DE"
            ).毫秒
        );

    设置字段("扫描时间差_毫秒", 时间差);
结束

本示例中,该规则计算当前时间("欧洲/柏林"时区)与消息时间戳之间的差值,并将两个值转换为毫秒。

计算结果( 扫描时间差_毫秒 )表示事件以毫秒为单位的存续时长,并作为新字段存储于消息中。

警告

安全数据湖 不支持使用任何未经官方文档记载的函数。若测试任何不受支持的函数数据类型,请谨慎操作。

函数类型

内置 安全数据湖 函数可按以下类型分类。完整函数列表及说明请参阅 函数参考 .

匿名化

匿名化函数可对数据集或日志消息中的敏感数据进行脱敏处理。

资产增强

资产增强函数可提升、检索或移除与资产相关的日志数据。详见 资产增强 获取更多信息 安全数据湖 安全功能。

布尔值

布尔数据主要与条件语句相关联,通过根据条件评估结果为 来改变控制流,从而允许不同的操作。布尔函数用于确定布尔值或运算符。

转换

转换函数用于将值从一种格式转换为另一种格式。

日期/时间

日期/时间函数对日期和时间值执行操作或计算。

调试

调试函数用于确定程序在任意执行点的状态。

编码

编码函数可用于解码和转换字符串。

列表

列表函数可创建或检索用于分析的可操作集合。

查找

查找函数允许您在数据库中搜索某个值,然后从同一记录返回附加信息。

映射

映射函数对集合中的每个或所有元素应用指定操作。

消息处理

消息处理函数定义对消息的响应操作。在构建管道规则时,这些函数用于日志数据的各种丰富、移除、检索和路由操作。

模式匹配

模式匹配函数用于指定数据应符合的模式,并根据这些模式解构数据。

字符串

字符串函数用于操作字符串或查询字符串相关信息。

监视列表

监视列表函数提供检索或修改监视列表的操作功能。

函数参考

下列列表描述了 安全数据湖 .

函数

类别

描述

语法

缩写

字符串

使用省略号缩写字符串。宽度参数定义结果字符串的最大长度。

abbreviate(值:字符串,宽度:长整型)

abusech_ransom_lookup_domain

字符串

将域名与abuse.ch勒索软件域名黑名单进行匹配 RW_DOMBL .

abusech_ransom_lookup_domain(域名):通用查询结果

abusech_ransom_lookup_ip

字符串

将IPv4或IPv6地址与abuse.ch勒索软件域名黑名单进行匹配 RW_DOMBL .

abusech_ransom_lookup_ip(IP地址):通用查询结果

add_asset_categories

资产增强

为资产添加分类列表

add_asset_categories(资产名称:字符串,分类:列表)

anonymize_ip

匿名化

通过将最后一个八位字节设置为 0 .

anonymize_ip(IP):IP地址

array_contains

消息处理

检查数组中是否包含指定元素

参见示例

array_contains(元素,值,[区分大小写]):布尔值

array_remove

消息处理

从数组中移除指定元素

参见示例

array_remove(元素列表,值,[是否全部移除]):列表

base16_decode

字符串

提供返回小写字母的字符串base16解码功能。要求输入标准十六进制字符(0-9 A-F)。

base16_decode(值,[是否省略填充:布尔值])

base16_encode

字符串

提供使用16字符子集的标准不区分大小写十六进制编码功能。要求输入标准十六进制字符(0-9 A-F)。

base16_encode(值,[是否省略填充:布尔值])

base32_decode

字符串

使用32字符子集对字符串进行解码。采用"数字型"base32编码,从传统十六进制字母表扩展而来(0-9 A-V)。

base32_decode(值,[是否省略填充:布尔值])

base32_encode

字符串

使用32字符子集对字符串进行编码。采用"数字型"base32编码,从传统十六进制字母表扩展而来(0-9 A-V)。

base32_encode(值,[是否省略填充:布尔值])

base32human_decode

字符串

使用32字符子集解码人类可读格式的字符串。采用"易读型"base32编码,避免0/O或1/I的混淆(A-Z 2-7)。

base32human_decode(值,[是否省略填充:布尔值])

base32human_encode

字符串

使用32字符子集将字符串编码为人类可读格式。采用"易读型"base32编码,避免0/O或1/I的混淆(A-Z 2-7)。

base32human_encode(值,[是否省略填充:布尔值])

base64_decode

字符串

使用64字符子集对字符串进行解码。标准base64允许大小写字母混用,无需人类可读。

base64_decode(值,[是否省略填充:布尔值])

base64_encode

字符串

使用64字符子集解码字符串。常规base64允许大小写字母,无需具备可读性。

base64_encode(value,[omit_padding:boolean])

base64url_decode

字符串

提供URL安全的64字符子集字符串解码,适合作为文件名或直接传入URL而无需转义。

base64url_decode(value,[omit_padding:boolean])

base64url_encode

字符串

提供URL安全的64字符子集字符串编码,适合作为文件名或直接传入URL而无需转义。

base64url_encode(value,[omit_padding:boolean])

capitalize

字符串

将字符串首字母转为大写形式。

capitalize(value:string)

cidr_match

布尔值/消息函数

检查给定IP地址对象是否匹配cidr模式。

另请参阅: to_ip

cidr_match(cidr:string,ip:IpAddress)

clone_message

消息处理

克隆消息。若 message 参数省略,则默认使用当前处理中的消息。

clone_message([message:Message])

concat

字符串

返回一个合并了 第一个第二个 字符串文本的新字符串。 concat 函数仅能连接两个字符串。若需拼接超过两个子字符串,必须多次调用 concat 函数。

参见示例

concat(first:string,second:string)

contains

字符串

检查字符串是否包含另一字符串(忽略大小写)。

参见示例

contains(value:string,search:string,[ignore_case:boolean])

crc32

字符串函数/编码

返回给定字符串的十六进制CRC32摘要值。

crc32(value:string)

crc32c

字符串函数/编码

返回给定字符串的十六进制CRC32C摘要值(符合RFC 3720第12.1节规范)。

crc32c(value:string)

create_message

消息处理

根据给定参数创建新消息。若省略任何参数,则从当前处理消息的对应字段中取值。若 timestamp 参数被省略,所创建消息的时间戳将采用当前时刻的时间戳。

create_message([message:string],[source:string],[timestamp:DateTime])

csv_to_map

转换

将CSV字符串的单行转换为可供 set_fields .

另请参阅: set_fields

csv_to_map(value,fieldNames,[separator],[quoteChar],[escapeChar],[strictQuotes],[trimLeadingWhitespace],[ignoreExtraFieldNames])

days

日期/时间

创建一个时间周期,长度为 value 天。

另请参阅: is_period , period

days(value:long)

debug

调试

将传入值以字符串形式打印到 安全数据湖 日志中。注意调试信息仅会出现在处理待调试消息的 安全数据湖 节点日志中。

查看示例

debug(value:any)

drop_message

消息处理

移除给定的 消息 在规则执行完成后移除。这不会阻止同一管道的后续阶段对该消息进行处理。若 消息 参数省略,则函数默认使用当前正处理的消息。此功能可用于实现基于不同条件的灵活黑名单机制。

参见示例

drop_message(message:Message)

ends_with

字符串

检查 是否以 后缀 结尾,可选择忽略字符串大小写。

参见示例

ends_with(value:string,suffix:string,[ignore_case:boolean])

expand_syslog_priority

转换

syslog优先级数值 转换为其对应的级别和设施。

expand_syslog_priority(value:any)

expand_syslog_priority_as_string

转换

syslog优先级数值 转换为对应的严重性和设施字符串表示形式。

expand_syslog_priority_as_string(value:any)

first_non_null

列表

返回指定列表中首个非空元素 空值 。若列表为空则返回空值。

first_non_null(value:list)

flatten_json

字符串

字符串解析为JSON树结构,同时将所有容器扁平化为单层。JSON数组的解析方式由 array_handler 参数值决定。可用选项包括: array_handler

  • ignore :忽略所有顶层数组。

  • json :将顶层数组作为有效JSON字符串返回。

  • flatten :将所有数组和对象展开为顶层键值对。

flatten_json(value,array_handler):JsonNode

flex_parse_date

日期/时间

使用 Natty日期解析器 解析日期和时间 。若未在模式中检测到时区,则使用可选参数 timezone 作为假定时区。若未指定则默认为 UTC 。若解析器未能检测到有效日期时间,则返回默认日期时间;否则表达式将评估失败并中止执行。

另请参阅: is_date

flex_parse_date(value:string,[default:DateTime],[timezone:string])

format_date

日期/时间

返回给定的日期和时间 并根据指定的 格式 字符串进行格式化。若未指定时区,则默认采用 UTC .

format_date(value:DateTime,format:string,[timezone:string])

from_forwarder_input

消息处理

检查当前处理的消息是否通过指定转发器输入接收。可通过输入 名称 (不区分大小写)或其 ID .

from_forwarder_input(id:string|name:string)

from_input

消息处理

检查当前处理的消息是否通过指定(非转发器)输入接收。可通过输入 名称 (不区分大小写)或其 ID .

from_input(id:string|name:string)

get_field

消息处理

获取 用于字段。

get_field(field,[message]):Object

grok

模式匹配

应用grok模式 grok 。返回一个匹配对象,包含字段名和值的映射。可设置 only_named_capturestrue 以仅返回命名捕获的匹配结果。执行 grok 函数的结果可作为 set_fields 的参数,将提取的字段设置到消息中。

另请参阅: set_fields

grok(pattern:string,value:string,[only_named_captures:boolean])

grok_exists

布尔值

检查给定的Grok模式是否存在。 log_missing 决定当未找到匹配模式时是否生成日志消息。

grok_exists(pattern:string,[log_missing:boolean])

has_field

布尔/消息函数

检查给定的 消息 是否包含名为 字段 的字段。若 消息 参数省略,则默认使用当前处理中的消息。

has_field(字段:字符串,[消息:消息])

小时

日期/时间

创建一个持续 小时数的时间段。

hours(值:长整型)

in_private_net

消息处理

检查IP地址是否属于RFC 1918(10.0.0.0/8、172.16.0.0/12、192.168.0.0/16)或RFC 4193(fc00::/7)定义的私有网络。

in_private_net(IP地址):布尔型

is_bool

布尔型

检查给定的 是否为布尔值( ).

is_bool(值:任意类型)

is_collection

布尔值

检查给定的 是否为可迭代集合。

is_collection(value:any)

is_date

布尔值

检查给定的 是否为日期类型( DateTime ).

另请参阅: now , parse_date , flex_parse_date , parse_unix_milliseconds

is_date(value:any)

is_double

布尔值

检查给定的 是否为浮点数值( double ).

另请参阅: to_double

is_double(value:any)

is_ip

布尔值

检查给定的 是否为IP地址(IPv4或IPv6)。

参见: to_ip

is_ip(value:any)

is_json

布尔值

检查给定的 是否为解析后的JSON树。

参见: parse_json

is_json(value:any)

is_list

布尔值

检查 是否为可迭代列表。

is_list(value:any)

is_long

布尔值

检查 是否为整数值(类型为 长整型 ).

另请参阅: to_long

is_long(value:any)

is_map

Boolean

检查给定的 是否为映射。

另请参阅: to_map

is_map(value:any)

is_not_null

Boolean

检查 是否不为 .

参见示例

is_not_null(value:any)

is_null

Boolean

检查 是否为 .

参见示例

is_null(value:any)

is_number

布尔型

检查给定 是否为数值类型( 长整型双精度浮点型 ).

另请参阅: is_double , to_double , is_long , to_long

is_number(value:any)

is_period

布尔型

检查给定 是否为时间周期类型( period ).

另请参阅: years , months , weeks , , 小时 , 分钟 , , 毫秒 , 时间段

is_period(value:any)

是否为字符串

布尔值

检查 是否为字符串。

另请参阅: 转为字符串

is_string(value:any)

是否为URL

布尔值

检查给定 是否为解析后的URL。

另请参阅: 转为URL

is_url(value:any)

拼接

字符串

将指定范围内的数组元素连接为单个字符串。起始索引默认为 0 ,结束索引默认为列表最后一个元素的索引。若指定分隔符,则元素间会以该分隔符连接。

join(elements:list,[delimiter:string],[start:long],[end:long])

键值对

布尔值

从给定 中提取键值对,并以字段名和值的映射形式返回。可选项包括:

  • 分隔符 :用于分隔键值对的字符。字符串中的每个字符都会被单独使用,因此无需额外分隔。默认值: <[空格]> .

  • 键值分隔符 :用于分隔键与值的字符。同样不需要分隔每个字符。默认值: = .

  • 忽略空值 :跳过包含空值的键。默认值: true .

  • 允许重复键 :是否允许键重复。默认值: true .

  • 处理重复键 :当 允许重复键 时,处理重复键的方式。可选值 take_first ,表示仅使用该键的第一个值;或 take_last ,表示仅使用该键的最后一个值。将此选项设置为其他任何值将切换为连接处理模式,即合并该键的所有给定值,并用此选项设置的分隔符隔开。例如,设置 handle_dup_keys:"," 会将键 a 的所有给定值用逗号连接,例如 1,2,foo 。默认值: take_first .

  • trim_key_chars :需要从键的首尾剔除的字符。默认值: 不剔除 .

  • trim_value_chars :需要从值的首尾剔除的字符。默认值: 不剔除 .

另请注意,执行 key_value 函数的结果可作为参数传递给 set_fields 以将提取的字段设置到消息中。

另请参阅: set_fields

key_value(value:string,[delimiters:string],[kv_delimiters:string],[ignore_empty_values:boolean],[allow_dup_keys:boolean],[handle_dup_keys:string],[trim_key_chars:string],[trim_value_chars:string])

length

字符串

计算字符串中的字符数。若bytes=true,则改为计算字节数(假设采用UTF-8编码)。

length(value:string,[bytes:boolean])

list_count

List

获取列表中元素的数量。

list_count(list:list):Long

list_get

List

从列表中获取值。

list_get(list:list,index:long):Object

lookup

Lookup

在指定查找表中查询多值。

参见示例

lookup(lookup_table:string,key:any,[default:any])

lookup_add_string_list

Lookup

在指定查找表中添加字符串列表,成功时返回更新后的列表,失败时返回 null 此函数目前仅支持 MongoDB查找表 (截至撰写时)。

lookup_add_string_list(lookup_table,key,value,[keep_duplicates])

lookup_all

Lookup

在指定查找表中查询所有提供的值,并以数组形式返回全部结果。

参见示例

lookup_all(lookup_table,keys):list

lookup_assign_ttl

查找

为指定查找表中的键添加生存时间。成功时返回更新后的条目, 空值 失败时返回。

lookup_assign_ttl(lookup_table,key,ttl):对象

lookup_clear_key

查找

清除(移除)指定查找表中的键。 此函数目前仅支持 MongoDB查找表 (截至本文撰写时)。

lookup_clear_key(lookup_table,key)

lookup_has_value

查找

判断给定 是否存在于查找表中。若键存在则返回 ,若不存在则返回

lookup_has_value(lookup_table,key)

lookup_remove_string_list

查找

从指定查找表中移除给定字符串列表的条目。成功时返回更新后的列表, 空值 失败时返回。 此函数目前仅支持 MongoDB查找表 (截至本文撰写时)。

lookup_remove_string_list(lookup_table,key,value)

lookup_set_string_list

查找

在指定的查找表中设置一个字符串列表。成功时返回新值,失败时返回 null 此函数目前仅支持 MongoDB查找表

lookup_set_string_list(lookup_table:string,key:string,value:list)

lookup_set_value

查找

在指定的查找表中设置单个值。成功时返回新值,失败时返回 null 此函数目前仅支持 MongoDB查找表

lookup_set_value(lookup_table,key,value)

lookup_string_list

查找

在指定的查找表中查找字符串列表值。 此函数目前仅支持 MongoDB查找表

lookup_string_list(lookup_table,key,[default])

lookup_string_list_contains

布尔

查找 在指定查找表中通过键引用的字符串列表中查找。返回 仅当键值映射存在时,否则返回 .

lookup_string_list_contains(查找表,键,值)

lookup_value

查找

查找单个 在指定的查找表中。

参见示例

lookup_value(查找表:字符串,键:任意类型,[默认值:任意类型])

lowercase

字符串

字符串 转换为小写。区域设置(IETF BCP 47语言标签)默认为 en .

lowercase(值:字符串,[区域设置:字符串])

machine_asset_lookup

资产丰富

查找单个机器资产。如果多个资产匹配输入参数,则仅返回一个。

machine_asset_lookup(查找类型,值):映射

machine_asset_update

资产丰富

更新机器资产的IP或MAC地址。如果多个资产匹配输入参数,则仅选择一个。

machine_asset_update(查找类型,查找值,[IP地址列表],[主机名列表]):空

map_copy

映射

从映射中检索一个值。

map_copy(map):映射

map_get

映射

将映射复制到新映射。

map_get(map,key):对象

map_remove

映射

从映射中移除一个键。

map_remove(map,key):映射

map_set

映射

设置映射中的一个键。

map_set(map,key,value):映射

md5

字符串

生成值的十六进制编码MD5摘要 .

md5(value:字符串)

metric_counter_inc

调试

统计特定指标条件。计数器指标名称 名称 始终会以 org.graylog.rulemetrics 为前缀。默认值为 1 (若未指定增量值)。

metric_counter_inc(名称,[值]):空

毫秒

日期/时间

创建一个时间周期,其值为 指定的毫秒数。

另请参阅: is_period , period

millis(值:长整型)

分钟

日期/时间

创建一个时间周期,其值为 指定的分钟数。

另请参阅: is_period , period

minutes(值:长整型)

月份

日期/时间

创建一个时间周期,其值为 指定的月数。

另请参阅: is_period , period

months(value:long)

multi_grok

对字符串应用一组Grok模式并返回首个匹配项。

参见示例

multi_grok(patterns,value,[only_named_captures]):GrokMatch$GrokResult

murmur3_128

编码

生成输入值的十六进制MurmurHash3(128位)摘要。 .

murmur3_128(value:string)

murmur3_32

编码

生成输入值的十六进制MurmurHash3(32位)摘要。 .

murmur3_32(value:string)

normalize_fields

消息处理

将所有字段名转为小写以实现标准化。

normalize_fields([message]):Void

now

日期/时间

返回当前 日期时间 。使用默认时区 UTC .

另请参阅: is_date

now([timezone:string])

otx_lookup_domain

字符串

查询域名在AlienVault OTX威胁情报数据中的信息。需要配置名为 otx-api-domain .

参见示例

otx_lookup_domain(domain_name:string):OTXLookupResult

otx_lookup_ip

字符串

查询IPv4或IPv6地址在AlienVault OTX威胁情报数据中的信息。需要配置名为 otx-api-ip .

参见示例

otx_lookup_ip(ip_address:string):OTXLookupResult

parse_cef

字符串

将任何CEF格式的字符串解析为其字段。这是CEF字符串(以 CEF: 开头),不包含syslog信封。

parse_cef(cef_string,use_full_names):CEFParserResult

parse_date

日期/时间

使用给定的日期格式解析日期字符串。

parse_date(value:string,pattern:string,[locale:string],[timezone:string])

parse_json

字符串

解析 字符串解析为JSON,返回生成的JSON树。

另请参阅: to_map

parse_json(value:string)

parse_unix_milliseconds

日期/时间

尝试将UNIX毫秒时间戳(自1970-01-01T00:00:00.000Z以来的毫秒数)解析为正确的 DateTime 对象。

另请参阅: is_date

参见示例

parse_unix_milliseconds(value:long)

period

日期/时间

.

另请参阅: is_period , , , , , 小时 , 分钟 , , 毫秒

period(value:string)

正则表达式

模式匹配

使用Java语法将字符串与正则表达式进行匹配。

regex(pattern:string,value:string,[group_names:array[string])

regex_replace

模式匹配

将value与pattern中的正则表达式进行匹配,若匹配成功则用 replacement 替换。可在替换字符串中使用编号捕获组并引用它们。若 replace_all 设为 true 则替换所有匹配项;否则仅替换首个匹配项。

参见示例

regex_replace(pattern:string,value:string,replacement:string,[replace_all:boolean])

remove_asset_categories

资产增强

从资产中移除分类列表。

remove_asset_categories(asset_name,categories):Void

remove_field (legacyDeprecated)

消息处理

从给定 字段 中移除指定名称的字段 消息 (除非该字段被保留)。若 消息 参数省略,则默认使用当前正在处理的消息。

替代方案参见: remove_single_field , remove_multiple_fields

remove_field(field:string,[message:Message])

remove_from_stream

消息处理

从指定流中移除 消息 。可通过流 名称ID 定位目标流。若 消息 参数省略,则默认使用当前处理的消息。若消息最终未归属任何流,将自动路由回默认流“所有消息”,确保消息不会因复杂的流路由规则而丢失。

如需彻底丢弃消息,请使用 drop_message 函数。 remove_from_stream 操作后消息仍会在后续阶段继续处理。若要中止处理,请使用 drop_message ,或通过配置阶段条件使后续阶段不执行 remove_from_stream 已被调用。

remove_from_stream(id:字符串|name:字符串,[message:消息])

remove_multiple_fields

消息处理

移除匹配正则表达式模式及/或名称列表的字段,除非字段名被保留。

remove_multiple_fields([pattern:字符串],[names:列表],[message:消息])

remove_single_field

消息处理

从消息中移除单个字段,除非字段名被保留。

remove_single_field(field:字符串,[message:消息])

rename_field

消息处理

修改字段名称 old_fieldnew_field 并保持字段值不变。

rename_field(old_field:字符串,new_field:字符串,[message:消息])

replace

字符串

替换字符串中首次出现的 max 或所有匹配的子串。 max 默认为 -1 表示替换全部匹配项,使用 1 则仅替换第一个匹配项。 2 前两个替换,以此类推。

参见示例

替换(值:字符串,搜索:字符串,[替换项:字符串],[最大次数:长整型])

路由到流

消息处理

将消息的流分配设置为指定流。功能等同于'复制'且不会从当前流移除消息。若未指定 消息 参数,则默认使用当前处理中的消息。这将导致消息在连接到该流的管道上被评估,除非该流已处理过此消息。若 移除默认流 参数设为 ,则消息会同时从默认流“所有消息”中移除。 移除默认流 操作将在当前管道解析完成后生效。此规则不影响管道后续阶段对消息的处理。可通过指定流的 名称ID .

参见示例

路由到流(ID:字符串|名称:字符串,[消息:消息对象],[移除默认流:布尔值])

秒数

日期/时间

创建以 数值 秒数为单位的时间段。

另请参阅: 是否为时间段 , 时间段

秒数(值:长整型)

选择_json路径

映射

针对JSON树评估给定的 路径 并返回结果值的映射。

另请参阅: 是否为_json , 解析_json

选择_json路径(json:Json节点,路径:映射<字符串,字符串>)

设置关联资产

资产增强

添加关联资产信息。

设置关联资产([消息]):空

设置字段

消息处理

将指定的 字段 设置为新值。该 字段 名称必须有效且不能包含句点字符。前后空格会被修剪。字符串值的空格也会被修剪。可选的 前缀后缀 参数指定应添加到插入字段名称的前缀或后缀。可选的 清理字段 参数会将无效字段名称字符替换为下划线。如果 消息 被省略,则此函数使用当前处理的消息。使用 默认值 当无可用值时(即值为 或抛出异常)。

另请参阅: 设置字段集

设置字段(字段:字符串,值:任意,[前缀:字符串],[后缀:字符串],[消息:消息],[默认值:任意],[清理字段:布尔值])

设置字段集

消息处理

将给定的所有名称-值对设置到指定消息的 字段 中。这是一个类似 设置字段 的便捷函数,适用于将 选择JSON路径正则表达式 等函数的结果应用于当前处理的消息,特别是当键名是正则表达式结果时。可选的 前缀后缀 参数用于指定插入字段名称前/后添加的内容。可选的 清理字段 参数会将无效字段名字符替换为下划线。若省略 消息 参数,则默认使用当前处理的消息。

另请参阅: 设置字段 , 转为映射 , grok , key_value

set_fields(fields:Map<string,any>,[prefix:string],[suffix:string],[message:Message],[clean_fields:boolean)

sha1

编码

生成该值的十六进制编码SHA1摘要 .

sha1(value:string)

sha256

编码

生成该值的十六进制编码SHA256摘要 .

sha256(value:string)

sha512

编码

生成该值的十六进制编码SHA512摘要 .

sha512(value:string)

spamhaus_lookup_ip

查询

将IP地址与Spamhaus DROP和EDROP列表进行匹配。

spamhaus_lookup_ip(ip_address):GenericLookupResult

split

字符串

根据给定模式分割字符串。使用Java语法。

split(pattern:string,value:string,[limit:int])

starts_with

字符串

检查 是否以 前缀 开头,可选择忽略字符串的大小写。

参见示例

starts_with(value:string,prefix:string,[ignore_case:boolean])

string_array_add

字符串

将指定的字符串(或字符串数组) 添加到提供的字符串数组中。将输入数组和值/值数组转换为字符串。

参见示例

string_array_add(elements,value,[only_unique]):list

string_entropy

字符串

计算给定字符串中字符分布的香农熵。

string_entropy(value:string,[default:double])

substring

字符串

返回 的一个子字符串,从 起始 偏移量(基于零的索引)开始,可选择在结束偏移量处结束。两个偏移量都可以为负数,表示相对于 .

参见示例

substring(value:string,start:long,[end:long])

swapcase

字符串

交换字符串的大小写 字符串 将大写和标题格式转为小写,小写转为大写。

swapcase(value:string)

syslog_facility

转换

syslog设施编号 转换为其字符串表示形式。

syslog_facility(value:any)

syslog_level

转换

syslog严重性编号 转换为其字符串表示形式。

syslog_level(value:any)

threat_intel_lookup_domain

查找

将域名与所有启用的威胁情报源(OTX除外)进行匹配。

threat_intel_lookup_domain(domain_name,prefix):GlobalLookupResult

threat_intel_lookup_ip

查找

将IP地址与除OTX外所有启用的威胁情报源进行匹配。

threat_intel_lookup_ip(ip_address,prefix):GlobalLookupResult

to_bool

转换

使用参数的字符串值将其转换为布尔值。

to_bool(value:any)

to_date

转换

转换为日期。若未指定 时区 则默认采用 UTC .

另请参阅: is_date

to_date(value:any,[timezone:string])

to_double

转换

将第一个参数转换为双精度浮点数值。

to_double(value:any,[default:double])

to_ip

转换

将给定的 IP 字符串转换为 IpAddress 对象。

另请参阅: cidr_match

to_ip(ip:字符串)

to_long

转换

将第一个参数转换为长整型数值。

to_long(值:任意类型,[默认值:长整型])

to_map

转换

将给定的类映射值转换为有效映射。 to_map 函数目前仅支持将解析后的JSON树转换为映射,以便与 set_fields .

另请参阅: set_fields , parse_json

参见示例

to_map(值:任意类型)

to_string

转换

将第一个参数转换为其字符串表示形式。

to_string(值:任意类型,[默认值:字符串])

to_url

转换

将给定的 url 转换为有效URL。

to_url(url:任意类型,[默认值:字符串])

tor_lookup

查询

将IP地址与已知的Tor出口节点进行匹配,以识别来自Tor网络的连接。

tor_lookup(ip_address):GenericLookupResult

流量统计大小

消息处理

计算整个消息的大小,包括所有额外字段。该值也用于确定消息对许可证使用量的贡献程度。

参见示例

traffic_accounting_size[(message)]:long

首字母小写

字符串

将字符串的首字母转换为小写。

uncapitalize(value:string)

大写

字符串

将字符串转换为大写。区域设置(IETF BCP 47语言标签)默认为 en .

uppercase(value:string,[locale:string])

URL解码

字符串

使用特定的编码方案解码application/x-www-form-urlencoded字符串。

urldecode(value:string,[charset:string])

URL编码

字符串

使用特定的编码方案将字符串转换为application/x-www-form-urlencoded格式。有效的字符集包括,例如, UTF-8 , US-ASCII 等。默认为 UTF-8 .

url编码(值,[字符集])

用户资产查询

资产丰富化

查询单个用户资产。若多个资产匹配输入参数,仅返回其中一个。

用户资产查询(查询类型,值):映射

监视列表添加

监视列表

将值添加到指定类型的监视列表中。成功时返回 ,失败时返回 ,若监视列表配置不正确则抛出异常。

监视列表添加(类型,值):布尔型

监视列表包含

监视列表

在指定类型的监视列表中查找值。成功时返回 ,失败时返回 ,若监视列表配置不正确则抛出异常。

监视列表包含(类型,值):布尔型

监视列表移除

监视列表

从指定类型的监视列表中移除值。成功时返回 ,失败时返回 ,若监视列表配置不正确则抛出异常。

监视列表移除(类型,值):布尔型

日期/时间

创建一个时间周期,长度为 周数。

另请参阅: is_period , period

weeks(value:long)

whois_lookup_ip

查询

检索IP地址的WHOIS信息

whois_lookup_ip(ip_address,prefix):WhoisIpLookupResult

日期/时间

创建一个时间周期,长度为 年数。

另请参阅: is_period , period

years(value:long)

示例

函数

示例

array_contains

rule "array_contains"
when
    true
then
    set_field("contains_number", array_contains([1, 2, 3, 4, 5], 1));
    set_field("does_not_contain_number", array_contains([1, 2, 3, 4, 5], 7));
    set_field("contains_string", array_contains(["test", "test2"], "test"));
    set_field("contains_string_case_insensitive", array_contains(["test", "test2"], "TEST"));
    set_field("contains_string_case_sensitive", array_contains(["test", "test2"], "TEST", true));
end

array_remove

规则 "array_remove"
当
    真
则
    设置字段("remove_number", 数组移除([1, 2, 3], 2));
    设置字段("remove_string", 数组移除(["one", "two", "three"], "two"));
    设置字段("remove_missing", 数组移除([1, 2, 3], 4));
    设置字段("remove_only_one", 数组移除([1, 2, 2], 2));
    设置字段("remove_all", 数组移除([1, 2, 2], 2, 真));
结束

连接

令 构建消息_0 = 连接(转字符串($消息.协议), " 连接自 ");
令 构建消息_1 = 连接(构建消息_0, 转字符串($消息.源IP));
令 构建消息_2 = 连接(构建消息_1, " 至 ");
令 构建消息_3 = 连接(构建消息_2, 转字符串($消息.目标IP));
令 构建消息_4 = 连接(构建消息_3, " 端口 ");
令 构建消息_5 = 连接(构建消息_4, 转字符串($消息.目标端口));
设置字段("message", 构建消息_5);

包含

包含(转字符串($消息.主机名), "example.org", 真)

调试

丢弃来自<来源>的消息"令 调试消息 = 连接("丢弃来自 ", 转字符串($消息.来源));调试(调试消息);`

丢弃消息

规则 "丢弃超过16383字符的消息"
当    
    有字段("message") 且    
    正则匹配(模式: "^.{16383,}$", 值: 转字符串($消息.message)).匹配 == 真
则   
    丢弃消息();    
    // 添加调试消息以通知被丢弃的消息    
    调试( 连接("丢弃过大的消息来自 ", 转字符串($消息.来源)));
结束

以...结尾

返回 :

以...结尾 (  "Foobar Baz Quux" , "quux" , 真  );

返回 :

以...结尾 (  "Foobar Baz Quux" , "Baz"  ); `

grok存在

当
  grok存在("USERNAME")
则
  令 解析结果 = grok("%{USERNAME:username}", 转字符串($消息.message));
  设置字段("parsed_username", 解析结果.username);
结束

十六进制转十进制字节列表

十六进制转十进制字节列表(值: "0x17B90004");

返回: [23, 185, 0, 4]

十六进制转十进制字节列表(值: "0x117B90004");

返回: [1, 23, 185, 0, 4]

十六进制转十进制字节列表(值: "17B90004");

返回: [23, 185, 0, 4]

十六进制转十进制字节列表(值: "117B90004");

返回: [1, 23, 185, 0, 4]

十六进制转十进制字节列表(值: "not_hex");

返回:null

is_not_null

is_null(src_addr)

lookup

规则 "目标IP地理信息查询"
当 
    存在字段("dst_ip")
那么  
    令 geo = 查询("geoip-lookup", 转字符串($消息.dst_ip));
    设字段("dst_ip_geolocation", geo["coordinates"]); 
    设字段("dst_ip_geo_country_code", geo["country"].iso_code); 
    设字段("dst_ip_geo_country_name", geo["country"].names.en); 
    设字段("dst_ip_geo_city_name", geo["city"].names.en);
 结束

lookup_all

规则 "批量查询函数"
当
    true
那么
    令 values = 批量查询("lut_name", ["key1", "key2", "key3"]);
    设字段("values", values); 
结束

lookup_value

("ip_lookup", 转字符串($消息.src_addr));

multi_grok

当
  true
那么
  设字段集(
    字段集: 多模式匹配(
        模式: [
            "^ABC %{IPORHOST:msg_ip}: %{GREEDYDATA:abc_message}",
            "^123 %{IPORHOST:msg_ip}: %{GREEDYDATA:123_message}",
            "^ABC2 %{IPORHOST:abc_ip}: %{GREEDYDATA:abc_message}"
            ],
        值: 转字符串($消息.message),
        仅命名捕获: true
    )
  );
结束

otx_lookup_domain

规则 "解析IP到DNS"
当
    存在字段("source_ip")
    且 正则匹配(
        模式: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
        值: 转字符串($消息.source_ip)
        ).匹配 == true
那么
    令 rs = 查值("dns_lookups", 转字符串($消息.source_ip));
    设字段("source_ip_dns", 转字符串(rs));
结束

otx_lookup_ip

规则 "解析source_ip - OTX-API-IP"
当
    // 验证消息有source_ip字段
    存在字段("source_ip")
    // 验证源IP为IPv4格式
    且 正则匹配(
        模式: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
        值: 转字符串($消息.source_ip)
        ).匹配 == true
那么
    令 rs = OTX查询IP(转字符串($消息.source_ip));
    设字段集(rs);
结束

parse_unix_milliseconds

设字段("timestamp", timestamp);

regex_replace

令 username = 正则替换(".*user: (.*)", 转字符串($消息.message), "$1");

replace

令 new_field = 替换(转字符串($消息.message), "oo", "u");    // "fu ruft uta"
令 new_field = 替换(转字符串($消息.message), "oo", "u", 1); // "fu rooft oota"

route_to_stream

路由到流(id: "512bad1a535b43bd6f3f5e86");

starts_with

返回true:

starts_with("Foobar Baz Quux", "foo", true);

返回false:

starts_with("Foobar Baz Quux", "Quux");

string_array_add

规则 "字符串数组添加"
当
    true
那么
    设字段("add_number_to_string_array_converted", 字符串数组添加(["1", "2"], 3));
    设字段("add_number_array_to_string_array_converted", 字符串数组添加(["1", "2"], [3, 4]));
    设字段("add_string", 字符串数组添加(["one", "two"], "three"));
    设字段("add_string_again", 字符串数组添加(["one", "two"], "two"));
    设字段("add_string_again_unique", 字符串数组添加(["one", "two"], "two", true));
    设字段("add_array_to_array", 字符串数组添加(["one", "two"], ["three", "four"]));
结束

子字符串

= 截取字符串(转字符串($消息.消息), 0, 20);

转为映射

let json = 解析JSON(转字符串($消息.json负载));
let map = 转为映射(json);
设置字段(map);

流量统计大小

设置字段(
    字段: "许可证使用量",
    值: 流量统计大小() // 单位为字节
    //值: 流量统计大小() / 1024 // 单位为千字节
    );

日志增强

查找表

查找表允许您通过替换消息字段值或创建全新消息字段,来映射、转换或丰富日志数据。例如,您可以使用静态CSV文件将IP地址映射到主机名,或使用外部数据源为消息添加威胁情报、地理位置或资产信息。

此功能可通过内部系统或第三方集成的上下文增强原始日志数据,将其转化为更丰富、可操作的洞察。

组件

查找表系统由四个组件组成:

数据适配器用于实际查找值。它们可能从CSV文件读取、连接数据库或执行请求以获取查找结果。

数据适配器实现是可插拔的,可通过插件添加新适配器。

警告

CSV文件适配器会将整个文件内容读入堆内存。请确保相应调整堆内存大小。

缓存负责缓存查找结果以提高查找性能和/或避免过载数据库和API。它们是独立实体,使得可以为不同数据适配器重用缓存实现。这样,数据适配器无需关心缓存,也无需自行实现。

缓存实现是可插拔的,可通过插件添加新缓存。

提示

如果文件发生更改,CSV文件适配器会在每个检查间隔内刷新其内容。如果缓存被清除但检查间隔未到,查找可能会返回过期值。

查找表组件将数据适配器实例和缓存实例绑定在一起。它需要在转换器、管道函数和装饰器中使用查找表。

查找结果由查找表通过数据适配器返回,可包含两种类型的数据: 单值 多值 .

单值 可以是字符串、数字或布尔值,将用于转换器、装饰器和管道规则。在我们的CSV示例中查找IP地址对应的主机名时,该值即为主机名字符串。

多值 是一种映射或类字典数据结构,可包含多个不同值。当数据适配器能为一个键提供多个值时,此功能非常有用。例如geo-ip数据适配器不仅提供IP地址的经纬度,还包含该地理位置的城市和国家信息。当前多值仅能在使用 lookup() 管道函数时应用于管道规则中。

示例1: 包含单值与多值的CSV数据适配器输出示例。

example 1.png

示例2: 包含单值与多值的geo-ip数据适配器输出示例。

example 2.png

配置流程

您可在 系统 > 查找表 窗口配置查找表。

每个查找表至少需要一个数据适配器和一个缓存。

  1. 创建数据适配器:

    1. 前往 系统 > 查找表 > 数据适配器 .

    2. 选择 创建适配器 并选择数据适配器类型。

    3. 填写适配器配置表单,其中包含每种类型的内置文档说明。

  2. 创建缓存:

    1. 前往系统 → 查询表 → 缓存。

    2. 点击创建缓存并选择缓存类型。

    3. 填写缓存配置表单。查阅表单中包含的缓存专用文档。

      注意

      除非在配置时选择忽略空结果,否则空结果会被缓存。

  3. 创建查询表:

    1. 前往 系统 > 查询表 .

    2. 选择 创建查询表 .

    3. 选择数据适配器和缓存实例,并可选择定义默认值。

      注意

      当查找未返回结果时使用默认值。若在查找表中未找到对应键值, 安全数据湖 将自动返回定义的默认值。

创建后,可在提取器、装饰器和管道规则中引用该查找表。

用法

查找表可应用于 安全数据湖 的多个领域以增强数据上下文:

  • 转换器 – 在消息摄取期间对提取值执行查找。

  • 装饰器 – 在搜索时丰富消息内容而不修改存储数据。

  • 管道规则 – 通过 lookup()lookup_value() 函数动态应用逻辑。

内置数据适配器

安全数据湖 预置多种即用型数据适配器。每种类型在 编辑数据适配器 表单中均有屏幕文档说明。

适配器

描述

CSV文件适配器

从静态CSV文件执行键值查找。

DNS查询适配器

执行主机名与IP解析(A、AAAA、PTR及TXT记录)。

DSV文件适配器

类似于CSV,但支持自定义分隔符和可配置的键/值列。

HTTPS JSONPath适配器

执行GET请求并使用JSONPath表达式提取数据。

Geo IP – MaxMind

利用MaxMind数据库提供IP地址的地理定位数据。

MongoDB

安全数据湖 新增对MongoDB数据适配器的支持,这些适配器将查询数据直接存储于 安全数据湖 配置数据库中。可通过API、图形界面或管道函数添加、更新或删除条目。

通过API管理MongoDB数据适配器

添加键的curl请求示例:

curl -u <token>:token \  
-H 'X-Requested-By: cli' \  
-H 'Accept: application/json' \  
-H 'Content-Type: application/json' \  
-X POST 'http://127.0.0.1:9000/api/plugins/org.graylog.plugins.lookup/lookup/adapters/mongodb/mongodb-data-name' \
--data-binary '{
    "键": "myIP",
    "值": ["12.34.42.99"],
    "数据适配器ID": "5e578606cdda4779dd9f2611"
  }'

注意

条目也可直接从 安全数据湖 界面管理,或通过管道规则使用查询相关函数动态修改。

提示

若需在图形界面中为单个键添加多个值,请用换行符分隔每个值。

地理定位

安全数据湖 支持从日志中的IP地址提取并可视化地理定位信息。

本文提供逐步指导,说明如何配置地理定位处理器并利用提取的数据创建地图。

配置处理器

安全数据湖 默认具备地理定位功能,但 仍需额外配置 。本节详细说明如何配置该功能。

注意

需创建账户获取许可证密钥以下载MaxMind数据库。更多信息请访问 MaxMind的博客文章

配置处理器

您需要配置 安全数据湖 以开始使用地理位置数据库解析日志中的IP地址。

  1. 导航至 系统 > 配置 .

  2. 选择 插件 > 地理位置处理器 ,然后点击 编辑配置 .

  3. 勾选 启用地理位置处理器 复选框。

  4. 从下拉菜单中选择MaxMind或IPInfo。

  5. 输入您使用的城市数据库和ASN数据库的路径。您还可以调整刷新间隔。

  6. 选择 更新配置 以保存配置。

Illuminate与地理位置

地理位置配置需在 安全数据湖 开启状态下使用。Illuminate 无需 需使用地理位置数据。

若需在Illuminate内容中获取地理位置数据,必须确保在消息处理器配置中,Illuminate处理器运行于GeoIP解析器之前。请注意该顺序应为默认设置。

检查当前环境配置:

  1. 进入 系统 > 配置 .

  2. 选择 消息处理器 ,随后在表格中确认顺序。

    如需调整顺序:

    1. 选择 编辑配置 .

    2. 通过拖放按需调整列表中项目顺序。

    3. 选择 更新配置 .

强制 安全数据湖 架构选项

配置地理位置处理器时,默认会选中 强制默认架构 选项。若禁用架构强制,所有非保留IP地址的IP字段都将被处理,并自动添加以下前缀字段:

  • _地理位置

  • _国家代码

  • _城市名称

源IP 字段为例,生成字段可能显示为:

  • 源IP_城市名称 : 维也纳

  • source_ip_country_code : AT

  • source_ip_geolocation : 48.20849, 16.37208

若启用模式强制,则仅处理以下非保留IP地址的GIM模式字段:

  • destination_ip

  • destination_nat_ip

  • event_observer_ip

  • host_ip

  • network_forwarded_ip

  • source_ip

  • source_nat_ip

以下是为 source_ip 字段生成的示例内容:

  • source_as_number : AS1853

  • source_as_organization : ACONET

  • source_geo_city : 维也纳

  • source_geo_coordinates : 48.20849, 16.37208

  • source_geo_country_iso : AT

  • source_geo_name : 维也纳, AT

  • source_geo_region : 维也纳

  • source_go_timezome : Europe/Vienna

在AWS S3中存储地理定位数据库文件

一个配置选项 从S3存储桶拉取文件 位于配置页面底部,允许从AWS S3存储桶拉取地理定位数据库文件。启用此功能后,可将S3存储桶URL添加到路径配置值中。

Geolocation Update Configuration.png

启用 时,服务会在每个刷新间隔运行,并轮询提供的S3存储桶中的文件。如果这些文件自上次轮询后已更新,则新文件将被拉取到每个节点上。该服务依赖于 默认凭证提供程序 获取S3存储桶的凭证,不使用任何可能在 安全数据湖 AWS插件配置设置中设置的配置值。

从S3获取的地理定位数据库文件存储在 安全数据湖 data_dir 目录下的 geolocation 子目录中。要更改这些文件的下载位置,请在 geo_ip_processor_s3_download_location 中设置磁盘上的所需位置,位于 安全数据湖 服务器配置文件中。

如果保留 禁用 从S3存储桶拉取文件的选项,所有 安全数据湖 节点将从磁盘上的路径读取文件,并需要手动更新这些文件以获取更新。

在地图中可视化地理定位

安全数据湖 可显示存储在任何字段中的地理位置地图,只要地理坐标点采用 纬度,经度 格式。

在搜索结果页面显示地图

在任何搜索结果页面,您都可以在搜索侧边栏展开要用于绘制地图的字段。点击左侧边栏的创建按钮(+),在通用菜单下选择聚合。

这将生成一个空的聚合组件。点击 编辑 并输入信息。选择世界地图作为可视化类型,随后即可看到该字段中存储的所有坐标点构成的地图。

您可以点击 更新预览 查看地图效果,并在点击'更新组件'前进行修改。

Geolocation Map1.png

注意

添加度量指标会影响地图上圆点的大小。若未定义指标,所有圆点半径相同。

关于 安全数据湖 中涉及不同地理坐标来源的其他字段,请查阅 安全数据湖 模式 .

将地图添加到仪表板

您可以像添加其他组件一样,将地图可视化添加到任意仪表板。在搜索结果页面显示地图时:

  1. 点击右上角的三点图标

  2. 选择 导出到仪表板 .

随后可对新仪表板进行重命名、编辑和保存。

数据适配器

ThreatFox威胁指标追踪数据适配器

ThreatFox是 abuse.ch 该项目追踪与恶意软件相关的危害指标(IOCs)。ThreatFox数据适配器支持通过以下关键类型进行查询:

  • URL

  • 域名

  • IP:端口

  • MD5哈希值

  • SHA256哈希值

创建数据适配器时,ThreatFox会将数据集下载并存储到MongoDB中。 刷新间隔 配置参数用于确定何时获取新数据集。

示例查询数据

对文件哈希值 923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4 的查询可能产生以下输出:

{
  "首次出现时间(UTC)": "2021-07-07T17:03:57.000+0000",
  "IOC编号": "158365",
  "IOC值": "923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4",
  "IOC类型": "sha256哈希",
  "威胁类型": "载荷",
  "关联恶意软件": "win.agent_tesla",
  "恶意软件别名": [
    "AgenTesla",
    "AgentTesla",
    "Negasteal"
  ],
  "恶意软件显示名": "Agent Tesla",
  "置信度": 50,
  "参考链接": "https://twitter.com/RedBeardIOCs/status/1412819661419433988",
  "标签": [
    "agenttesla"
  ],
  "匿名提交": false,
  "提交者": "Virus_Deck"
}
配置数据适配器
  • 标题

    • 数据适配器的简短标题。

  • 描述

    • 数据适配器的描述信息。

  • 名称

    • 数据适配器的唯一名称。

  • 自定义错误TTL

    • 用于缓存错误结果的自定义TTL(可选)。默认值为5秒。

  • 包含超过90天的IOCs

    • 可选设置,包含超过90天的IOCs。默认情况下,数据适配器的数据不包含超过90天的IOCs。为避免误报,请谨慎处理超过90天的IOCs。

  • 刷新间隔 - 决定获取新数据的频率。最小刷新间隔为3600秒(1小时),因为源数据的更新频率就是每小时一次。

  • 不区分大小写查询 - 允许数据适配器执行不区分大小写的查询。

URLhaus恶意软件URL数据适配器

URLhaus是一个来自 abuse.ch 的项目,负责维护用于恶意软件分发的恶意URL数据库。创建数据适配器时,URLhaus会下载并将相应数据集存储于MongoDB中。 刷新间隔 配置用于设定获取新数据集的时间。

示例查询数据

对URL https://192.168.100.100:35564/Mozi.m 的查询可能产生如下输出:

{
  "single_value": "malware_download",
  "multi_value": {
    "date_added": "2021-06-22T17:53:07.000+0000",
    "url_status": "online",
    "threat_type": "malware_download",
    "tags": "elf,Mozi",
    "url": "http://192.168.100.100:35564/Mozi.m",
    "urlhaus_link": "https://urlhaus.abuse.ch/url/1234567/"
  },
  "string_list_value": null,
  "has_error": false,
  "ttl": 9223372036854776000
}
配置数据适配器
  • 标题

    • 数据适配器的简短标题。

  • 描述

    • 数据适配器的说明。

  • 名称

    • 引用数据适配器的唯一名称。

  • 自定义错误TTL

    • 用于缓存错误结果的可自定义TTL。若未指定值,则默认为5秒。

  • URLhaus数据源类型

    • 决定数据适配器将使用哪个URLhaus数据源。

    • 在线URL 是较小数据集,仅包含当前检测为在线的URL。

    • 最近添加的URL 是较大数据集,包含最近30天内添加的所有在线和离线URL。

  • 刷新间隔 - 决定获取新数据的频率。最小刷新间隔为300秒(5分钟),因为源数据更新频率即为此间隔。

  • 不区分大小写查询 - 允许数据适配器执行不区分大小写的查询。