流水线
流水线是 安全数据湖 日志消息处理系统的核心组件,为评估、修改和路由传入数据提供结构化框架。它们定义了消息摄取后应用的处理步骤序列,确保日志数据得到一致、高效且定制化的处理。
每条流水线由一系列 规则 组成,这些规则可关联至 数据流 ,并且可以链接到一个或多个数据流。这种连接方式使您能够精确控制特定消息的处理方式和时机,从而实现对数据丰富化、规范化和路由的细粒度管控。
提取器
提取器 是 安全数据湖 的遗留功能,最初用于在日志消息摄入时进行处理和解析。建议改用更健壮、可定制的管道功能进行集中式消息处理。管道具有灵活性,支持条件逻辑,并能跨多个输入源运作。
核心概念
管道
管道是由多个规则按阶段组织而成的集合。当绑定到数据流时,进入该流的消息会按照预定义的阶段顺序通过所有连接的管道进行处理。
管道规则
管道规则定义了消息处理逻辑。
它们可以:
-
将消息路由到不同数据流
-
通过添加或修改字段实现数据丰富化
-
转换消息内容
-
规范化消息格式以实现统一搜索与分析
一组协同运作的关联规则构成完整的管道工作流。
函数
函数 是管道规则的基础构件。每个函数执行特定操作(如解析文本、检查字段值或更改消息内容),并可接受参数控制其行为。 函数返回的结果会影响后续规则对消息的处理方式。
阶段
管道划分为多个阶段,每个阶段包含一个或多个规则。阶段按数字顺序依次执行。
所有相同优先级的阶段会在所有连接的管道中并行执行。这种结构支持构建多步骤工作流,例如:在一个阶段解析消息,在另一阶段进行数据丰富化,最终在最后阶段完成路由。
数据流
每个管道必须至少连接一个 数据流 ,这决定了管道处理哪些消息。
进入数据流的消息会触发所有连接管道的执行。详见数据流说明文档。
管道规则逻辑
安全数据湖中的管道 是由管道规则构建的,这些规则定义了日志消息在被索引或存储之前如何被检查、转换和路由。 这些规则使用专用的领域专用语言(DSL),该语言提供了受控且易读的语法来定义处理逻辑,同时保持强大的运行时性能。
每条规则结合了一个条件和一个 动作 :
-
条件决定规则何时适用。
-
动作指定当条件满足时发生的操作。
理解数据类型对于编写规则至关重要。数据类型定义了字段所保存的值的种类(例如字符串、数字或IP地址)以及该值在规则中如何被操作。
管道规则是通过使用函数构建的——这些预定义的方法执行特定任务,如转换数据类型、操作字符串、解析JSON或检索 查找表 数据。 安全数据湖 包含了广泛的内置函数,帮助你有效地丰富、转换和管理日志数据。
规则可以通过 规则构建器 界面交互式创建和测试,或者由高级用户在源码编辑器中手动编写。
示例管道
下面的示例展示了管道及其规则的内部结构:
pipeline "My new pipeline" stage 1 match all rule "has firewall fields"; rule "from firewall subnet"; stage 2 match either rule "geocode IPs"; rule "anonymize source IPs"; end
该管道定义了两个阶段:
-
阶段1仅在所有列出的规则都为真时执行。
-
阶段2如果其中任一规则匹配,则执行(类似于或条件)。
阶段按升序运行,每个阶段可以引用可重用的规则。这允许模块化设计:像
hasfirewallfields
这样的规则可以在多个管道中共享,避免重复。
示例规则
以下是管道中引用的两个示例规则:
规则1
rule "has firewall fields"
when
has_field("src_ip") && has_field("dst_ip")
then
end
规则2
rule "来自防火墙子网"
when
cidr_match("10.10.10.0/24", to_ip($message.gl2_remote_ip))
then
end
两条规则均使用内置函数定义条件:
-
has_field()检查是否存在特定消息字段。 -
cidr_match()评估IP地址是否属于指定子网范围。
该
to_ip()
转换确保值被解析为IP地址而非字符串,体现了
安全数据湖
对规则验证的严格类型约束。
这些规则不包含动作(
then
为空),因其仅用于控制管道流程。
条件
规则的
when
子句是针对每条消息评估的布尔表达式。
支持逻辑运算符 AND(
&&
)、OR(
||
)和 NOT(
!
),以及比较运算符如
<
,
<=
,
>
,
>=
,
==
,以及
!=
.
例如:
has_field("src_ip") && cidr_match("10.0.0.0/8", to_ip($message.src_ip))
若条件引用了不存在的函数,则自动判定为
false
.
比较字段时,需确保两者类型相同,例如:
to_string($message.src_ip) == to_string($message.dst_ip)
操作
当条件判定为
then
时,
true
.
操作可以是:
-
函数调用,如
set_field("type","firewall_log"); -
变量赋值,如
letsubnet=to_string($message.network);
变量可用于存储临时值、避免重复计算并提升规则可读性。
保留字
规则语言中的特定标记为保留字,不可用作变量名,包括:
-
所有 -
Either -
Pass -
And -
Or -
Not -
Pipeline -
Rule -
During -
Stage -
When -
Then -
End -
Let -
Match
例如:
let match = regex(a,b);
将因match是保留字而失败
数据类型
安全数据湖 在管道规则中强制类型安全以防止无效操作,支持以下内置数据类型:
|
数据类型 |
描述 |
|---|---|
|
字符串 |
UTF-8文本值 |
|
双精度浮点数 |
浮点数(Java Double类型) |
|
长整型 |
整型数字(Java Long类型) |
|
布尔型 |
真或假值 |
|
void |
无返回值的函数 |
|
ip |
IP地址(InetAddress的子集) |
插件可定义额外类型。以
to_
为前缀的转换函数(例如
to_string()
,
to_ip()
,
to_long()
)确保类型正确处理。完整函数列表请参阅
函数参考手册
.
注意
在比较或使用函数前,始终将消息字段转换为正确类型。 例如:
set_field("timestamp", to_string(`$message.@extracted_timestamp`));
构建流水线规则
流水线由规则定义,这些规则决定消息通过 安全数据湖 时的处理方式。每条规则通过条件与动作的组合,使您能基于特定标准过滤、丰富、转换或路由日志数据。
创建规则时,需使用规则构建器或源代码编辑器定义逻辑,将“when”条件与“then”动作结合,精确描述消息处理方式。规则创建后,可加入流水线、分阶段组织并连接至数据流——实现从摄取到存储的灵活自动化消息处理控制。
本文概述创建和管理流水线规则的流程。
配置消息处理器
开始构建流水线规则前,请确保 消息处理器 已启用并正确配置:
-
G 前往 系统 > 配置 .
-
选择 消息处理器 .
-
选择 编辑配置 并启用 管道处理器 勾选其旁边的复选框。
-
拖动 管道处理器 使其位于 消息过滤链 之后。使用左侧的六个点进行拖动。
-
点击 更新配置 .
创建和管理规则
规则可通过 规则构建器 或 源代码编辑器 .
规则构建器(默认视图)提供了一种引导式的可视化规则创建方式。
要切换至手动编辑模式,请从创建菜单中选择 使用源代码编辑器 。
警告
您可以将规则从规则构建器转换为源代码编辑器,但无法逆向转换。
使用规则构建器创建规则
规则构建器提供了一种可视化、结构化的方法,可直接在 安全数据湖 界面中编写规则。
每条规则遵循简单的“当→则”模式:
-
当 用于定义触发规则的条件。
-
则 用于定义满足条件时执行的操作。
两个输入框均支持可搜索的下拉菜单。输入函数名的前几个字母会显示建议和简短描述。完整函数列表请参阅《函数文档》。
使用规则构建器创建规则的步骤如下:
-
前往 系统 > 管道 > 管理规则 .
-
选择 创建规则 .
-
创建when语句。
-
(可选)您可以添加其他语句,并使用 与 或 或 运算符进行组合,这些运算符可从 当 部分的右上角区域选择。
-
创建一个 然后 语句以指定操作。
如果“然后”语句产生一个值,输出变量将自动显示,并可在后续语句中重复使用。
注意
规则可立即在 规则模拟 模块中进行测试。
使用源代码编辑器创建规则
您也可以手动编写规则,在 源代码编辑器 中使用“当”和“然后”语句。此视图支持完整的语法编辑,并包含函数及其描述的快速参考列表。
要使用源代码编辑器创建规则,请遵循以下规则:
-
转到 系统 > 管道 > 管理规则 .
-
选择 创建规则 .
-
选择 使用源代码编辑器 (位于页面右上角)。
-
配置规则。
注意
请参阅 管道规则逻辑 以获取语法详情。
-
选择 创建规则 .
与规则构建器类似,您可以在保存前通过规则模拟模块验证规则。
模拟管道规则
模拟功能允许您在部署前测试规则。您可以模拟完整消息或单个字段。在模拟框中输入原始消息字符串、键值对或JSON有效载荷。
模拟器会逐步显示分配的输出变量和处理结果。
提示
每条规则会保存最后使用的消息,因此随时可用于模拟。
运行模拟的步骤如下:
-
前往 系统 > 管道 > 模拟器 .
-
选择“运行规则模拟”。
-
输入示例消息。
-
查看处理后的输出结果。
-
如有需要,可重置或调整规则后重新运行。
管理流水线
创建规则后,您可将其组合成处理并丰富消息的流水线。请前往 系统 > 流水线 > 管理流水线 以创建、编辑或删除流水线。
每个流水线包含一个或多个定义执行顺序和逻辑的阶段。
创建流水线
新建流水线需遵循以下步骤:
-
前往 系统 > 流水线 > 管理流水线 .
-
点击屏幕右上角的 添加新流水线 按钮。
-
为流水线输入描述性名称和说明,并选择???
-
在 编辑连接 下方的 流水线连接 区域中
选择 编辑连接 窗口已显示。
-
在 流 字段下,选择要附加的流。
注意
管道仅作用于其所连接流中的消息。多个管道可处理同一流;其规则按阶段优先级运行。
提示
“ 所有消息 ”流是所有传入数据的默认入口点,也是处理路由、过滤或字段丰富化的通用管道的理想位置。
选择后,它们将被添加到菜单下方的列表中。您可以选择 移除 将其从列表中删除。
-
选择 添加新阶段 并配置阶段:
-
在 阶段 下输入阶段优先级,该值决定管道在序列中的执行顺序。此数字可为任意整数,数值较小的阶段优先运行。
-
选择如何处理后续阶段的规则:
-
本阶段所有规则均匹配消息 - 仅当满足所有条件时继续下一阶段。
-
本阶段至少一条规则匹配消息 - 满足任一条件即继续下一阶段
-
本阶段无规则或任意数量规则匹配 - 仅当不满足任何条件时继续下一阶段。
-
-
在阶段规则下,选择要应用的规则。
-
选择 添加阶段 保存信息。
-
-
如需添加更多阶段,请继续操作。
注意
每个新建阶段会在 流水线 菜单下新增一个版块。点击 编辑 修改阶段详情,或点击 删除 移除该阶段。
添加所有阶段后,流水线即构建完成,将显示在流水线页面。连接至数据流后,系统将根据您定义的规则与逻辑自动开始处理传入消息。
数据流测试与流水线模拟
使用流水线模拟器预览消息在当前流水线设置下的处理过程。测试数据流步骤如下:
-
进入 系统 > 流水线 > 模拟器 .
-
在 数据流 下选择待测数据流。
-
在 原始消息 中输入与传入日志格式相同的原始样本消息(例如GELF格式消息)。
-
(可选)指定源IP、输入类型及编解码器(日志消息解析机制)。
模拟执行后将显示:
-
变更摘要 ——列出被修改、新增或删除的字段。
-
结果预览 – 显示完整处理后的消息。
-
模拟追踪 – 详细记录执行的规则和管道及其耗时。
编辑与管道阶段
所有管道均显示在 系统 > 管道 > 管理管道 页面下。针对每个管道,您可选择 删除 移除管道,或 编辑 修改其配置。
应用场景
本文介绍在 安全数据湖 中创建和应用管道规则的实际用例。这些示例演示了如何过滤无效日志、丰富消息数据,并将消息路由至特定流或告警系统。参考这些场景来设计和实施高效管道规则,以优化日志数据的处理与分析流程。
|
规则 |
条件示例 |
动作示例 |
规则语法 |
|---|---|---|---|
|
匿名化处理 管道规则可在消息存储或转发前对敏感数据进行编辑或删除。 通过掩码IP地址、用户名或个人标识符等信息,确保符合隐私标准。 |
检查是否存在
|
移除
|
规则 "屏蔽敏感信息"
当
存在字段("source_ip")
则
移除单个字段("source_ip");
结束
|
|
面包屑 面包屑规则为消息添加元数据,以便跨系统或阶段追踪其流向。 这些规则常用于调试、标记或跟踪消息处理过程。 |
留空。这将确保规则适用于所有传入消息。 |
添加或更新字段
|
规则 "设置演示字段"
当
真
则
设置字段("rule_demo", "test");
结束
|
|
过滤 过滤规则可丢弃非必要消息,减少数据摄入量和许可证使用。 |
检查
|
完全丢弃该消息,防止其被存储或进一步处理。 |
规则 "丢弃测试消息"
当
存在字段("testing")
则
丢弃消息();
结束
|
|
修改 修改规则可变更消息内容,例如重新格式化时间戳或更新字段值。 |
检查消息是否包含
|
将时间戳从UTC转换为英国时间,并写入名为
|
规则 "将event_time转换为英国时区"
当
存在字段("event_time")
则
令 event_time_date = 解析日期(
值: 转字符串($消息.event_time),
模式: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", // 根据需要调整此模式
时区: "UTC"
);
令 event_time_uk = 格式化日期(
值: event_time_date,
日期格式: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
时区: "Europe/London"
);
设置字段("event_time_uk", event_time_uk
);
结束
|
|
丰富 丰富规则通过添加额外上下文或更新现有数据值来增强消息。 |
检查字段
|
将该字段值更新为
|
规则 "SrcCountryUnitedStates"
当
存在字段("Src_ip_geo_country") &&
转字符串($消息.Src_ip_geo_country) == "US"
则
设置字段(
字段: "Src_ip_geo_country",
值: "UniSt",
清除字段: false
);
结束
|
|
路由 路由规则将特定消息发送至另一流,并可选地从当前流中移除。 提示在规则中引用目标流前,请预先创建该目标流。 |
检查
|
将消息路由至目标流(
|
规则 "路由消息至流"
当
存在字段("gl2_remote_ip") &&
转字符串($消息.gl2_remote_ip) == "66914166ac1d1568bad817f3"
则
路由至流(
名称: "我的第一个流",
从默认流移除: true
);
结束
|
管道函数
函数是管道规则的构建模块。每个函数都是预定义方法,在日志消息流经 安全数据湖 处理管道时对其执行特定操作。
函数可接受一个或多个参数,并返回决定消息如何转换、丰富或路由的输出结果。通过在条件和动作中组合函数,您能定义强大的处理逻辑,使消息处理符合组织需求。
完整支持的 安全数据湖 函数列表、描述及语法示例,请参阅 函数参考 .
语法
安全数据湖中的 管道函数 采用Java实现并设计为可插拔架构,便于您轻松扩展平台的处理能力。
从概念上讲,函数接收参数(如当前消息上下文)并返回值。参数和返回值的类型决定了该函数在规则中的适用场景。 安全数据湖 会自动验证这些类型,确保所有规则在逻辑和语法上均正确无误。
函数参数可通过命名键值对或位置顺序传递,但需确保所有可选参数声明在最后并最后处理。
Java数据类型
管道规则在构建查询或执行计算时可以使用某些Java数据类型。这仅限于通过
GET
函数查询的类型。
例如,您可以使用
.millis
属性(属于
DateTime
和
Period
对象)来获取以毫秒为单位的时间值。
这使您能够执行精确的时间计算,例如测量消息相对于当前时间的存在时长。
规则 "时间差计算器(毫秒)"
当
条件为真
则
定义时间差 =
转换为长整型(
解析日期(
值: 转换为字符串(当前时间(时区: "欧洲/柏林")),
格式: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
地区: "de_DE"
).毫秒
)
-
转换为长整型(
解析日期(
值: 转换为字符串($消息.时间戳),
格式: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
地区: "de_DE"
).毫秒
);
设置字段("扫描时间差_毫秒", 时间差);
结束
本示例中,该规则计算当前时间("欧洲/柏林"时区)与消息时间戳之间的差值,并将两个值转换为毫秒。
计算结果(
扫描时间差_毫秒
)表示事件以毫秒为单位的存续时长,并作为新字段存储于消息中。
警告
安全数据湖 不支持使用任何未经官方文档记载的函数。若测试任何不受支持的函数数据类型,请谨慎操作。
函数类型
内置 安全数据湖 函数可按以下类型分类。完整函数列表及说明请参阅 函数参考 .
匿名化
匿名化函数可对数据集或日志消息中的敏感数据进行脱敏处理。
资产增强
资产增强函数可提升、检索或移除与资产相关的日志数据。详见 资产增强 获取更多信息 安全数据湖 安全功能。
布尔值
布尔数据主要与条件语句相关联,通过根据条件评估结果为
真
或
假
来改变控制流,从而允许不同的操作。布尔函数用于确定布尔值或运算符。
转换
转换函数用于将值从一种格式转换为另一种格式。
日期/时间
日期/时间函数对日期和时间值执行操作或计算。
调试
调试函数用于确定程序在任意执行点的状态。
编码
编码函数可用于解码和转换字符串。
列表
列表函数可创建或检索用于分析的可操作集合。
查找
查找函数允许您在数据库中搜索某个值,然后从同一记录返回附加信息。
映射
映射函数对集合中的每个或所有元素应用指定操作。
消息处理
消息处理函数定义对消息的响应操作。在构建管道规则时,这些函数用于日志数据的各种丰富、移除、检索和路由操作。
模式匹配
模式匹配函数用于指定数据应符合的模式,并根据这些模式解构数据。
字符串
字符串函数用于操作字符串或查询字符串相关信息。
监视列表
监视列表函数提供检索或修改监视列表的操作功能。
函数参考
下列列表描述了 安全数据湖 .
|
函数 |
类别 |
描述 |
语法 |
|---|---|---|---|
|
缩写 |
字符串 |
使用省略号缩写字符串。宽度参数定义结果字符串的最大长度。 |
|
|
abusech_ransom_lookup_domain |
字符串 |
将域名与abuse.ch勒索软件域名黑名单进行匹配
|
|
|
abusech_ransom_lookup_ip |
字符串 |
将IPv4或IPv6地址与abuse.ch勒索软件域名黑名单进行匹配
|
|
|
add_asset_categories |
资产增强 |
为资产添加分类列表 |
|
|
anonymize_ip |
匿名化 |
通过将最后一个八位字节设置为
|
|
|
array_contains |
消息处理 |
检查数组中是否包含指定元素 参见示例 |
|
|
array_remove |
消息处理 |
从数组中移除指定元素 参见示例 |
|
|
base16_decode |
字符串 |
提供返回小写字母的字符串base16解码功能。要求输入标准十六进制字符(0-9 A-F)。 |
|
|
base16_encode |
字符串 |
提供使用16字符子集的标准不区分大小写十六进制编码功能。要求输入标准十六进制字符(0-9 A-F)。 |
|
|
base32_decode |
字符串 |
使用32字符子集对字符串进行解码。采用"数字型"base32编码,从传统十六进制字母表扩展而来(0-9 A-V)。 |
|
|
base32_encode |
字符串 |
使用32字符子集对字符串进行编码。采用"数字型"base32编码,从传统十六进制字母表扩展而来(0-9 A-V)。 |
|
|
base32human_decode |
字符串 |
使用32字符子集解码人类可读格式的字符串。采用"易读型"base32编码,避免0/O或1/I的混淆(A-Z 2-7)。 |
|
|
base32human_encode |
字符串 |
使用32字符子集将字符串编码为人类可读格式。采用"易读型"base32编码,避免0/O或1/I的混淆(A-Z 2-7)。 |
|
|
base64_decode |
字符串 |
使用64字符子集对字符串进行解码。标准base64允许大小写字母混用,无需人类可读。 |
|
|
base64_encode |
字符串 |
使用64字符子集解码字符串。常规base64允许大小写字母,无需具备可读性。 |
|
|
base64url_decode |
字符串 |
提供URL安全的64字符子集字符串解码,适合作为文件名或直接传入URL而无需转义。 |
|
|
base64url_encode |
字符串 |
提供URL安全的64字符子集字符串编码,适合作为文件名或直接传入URL而无需转义。 |
|
|
capitalize |
字符串 |
将字符串首字母转为大写形式。 |
|
|
cidr_match |
布尔值/消息函数 |
检查给定IP地址对象是否匹配cidr模式。 另请参阅: to_ip |
|
|
clone_message |
消息处理 |
克隆消息。若
|
|
|
concat |
字符串 |
返回一个合并了
参见示例 |
|
|
contains |
字符串 |
检查字符串是否包含另一字符串(忽略大小写)。 参见示例 |
|
|
crc32 |
字符串函数/编码 |
返回给定字符串的十六进制CRC32摘要值。 |
|
|
crc32c |
字符串函数/编码 |
返回给定字符串的十六进制CRC32C摘要值(符合RFC 3720第12.1节规范)。 |
|
|
create_message |
消息处理 |
根据给定参数创建新消息。若省略任何参数,则从当前处理消息的对应字段中取值。若
|
|
|
csv_to_map |
转换 |
将CSV字符串的单行转换为可供
另请参阅: set_fields |
|
|
days |
日期/时间 |
创建一个时间周期,长度为
|
|
|
debug |
调试 |
将传入值以字符串形式打印到 安全数据湖 日志中。注意调试信息仅会出现在处理待调试消息的 安全数据湖 节点日志中。 查看示例 |
|
|
drop_message |
消息处理 |
移除给定的
参见示例 |
|
|
ends_with |
字符串 |
检查
参见示例 |
|
|
expand_syslog_priority |
转换 |
将 syslog优先级数值 转换为其对应的级别和设施。 |
|
|
expand_syslog_priority_as_string |
转换 |
将 syslog优先级数值 转换为对应的严重性和设施字符串表示形式。 |
|
|
first_non_null |
列表 |
返回指定列表中首个非空元素
|
|
|
flatten_json |
字符串 |
将
|
|
|
flex_parse_date |
日期/时间 |
使用
Natty日期解析器
解析日期和时间
另请参阅: is_date |
|
|
format_date |
日期/时间 |
返回给定的日期和时间
|
|
|
from_forwarder_input |
消息处理 |
检查当前处理的消息是否通过指定转发器输入接收。可通过输入
|
|
|
from_input |
消息处理 |
检查当前处理的消息是否通过指定(非转发器)输入接收。可通过输入
|
|
|
get_field |
消息处理 |
获取
|
|
|
grok |
模式匹配 |
应用grok模式
另请参阅: set_fields |
|
|
grok_exists |
布尔值 |
检查给定的Grok模式是否存在。
|
|
|
has_field |
布尔/消息函数 |
检查给定的
|
|
|
小时 |
日期/时间 |
创建一个持续
|
|
|
in_private_net |
消息处理 |
检查IP地址是否属于RFC 1918(10.0.0.0/8、172.16.0.0/12、192.168.0.0/16)或RFC 4193(fc00::/7)定义的私有网络。 |
|
|
is_bool |
布尔型 |
检查给定的
|
|
|
is_collection |
布尔值 |
检查给定的
|
|
|
is_date |
布尔值 |
检查给定的
另请参阅: now , parse_date , flex_parse_date , parse_unix_milliseconds |
|
|
is_double |
布尔值 |
检查给定的
另请参阅: to_double |
|
|
is_ip |
布尔值 |
检查给定的
参见: to_ip |
|
|
is_json |
布尔值 |
检查给定的
参见: parse_json |
|
|
is_list |
布尔值 |
检查
|
|
|
is_long |
布尔值 |
检查
另请参阅: to_long |
|
|
is_map |
Boolean |
检查给定的
另请参阅: to_map |
|
|
is_not_null |
Boolean |
检查
参见示例 |
|
|
is_null |
Boolean |
检查
参见示例 |
|
|
is_number |
布尔型 |
检查给定
|
|
|
is_period |
布尔型 |
检查给定
|
|
|
是否为字符串 |
布尔值 |
检查
另请参阅: 转为字符串 |
|
|
是否为URL |
布尔值 |
检查给定
另请参阅: 转为URL |
|
|
拼接 |
字符串 |
将指定范围内的数组元素连接为单个字符串。起始索引默认为
|
|
|
键值对 |
布尔值 |
从给定
另请注意,执行
另请参阅: set_fields |
|
|
length |
字符串 |
计算字符串中的字符数。若bytes=true,则改为计算字节数(假设采用UTF-8编码)。 |
|
|
list_count |
List |
获取列表中元素的数量。 |
|
|
list_get |
List |
从列表中获取值。 |
|
|
lookup |
Lookup |
在指定查找表中查询多值。 参见示例 |
|
|
lookup_add_string_list |
Lookup |
在指定查找表中添加字符串列表,成功时返回更新后的列表,失败时返回
|
|
|
lookup_all |
Lookup |
在指定查找表中查询所有提供的值,并以数组形式返回全部结果。 参见示例 |
|
|
lookup_assign_ttl |
查找 |
为指定查找表中的键添加生存时间。成功时返回更新后的条目,
|
|
|
lookup_clear_key |
查找 |
清除(移除)指定查找表中的键。 此函数目前仅支持 MongoDB查找表 (截至本文撰写时)。 |
|
|
lookup_has_value |
查找 |
判断给定
|
|
|
lookup_remove_string_list |
查找 |
从指定查找表中移除给定字符串列表的条目。成功时返回更新后的列表,
|
|
|
lookup_set_string_list |
查找 |
在指定的查找表中设置一个字符串列表。成功时返回新值,失败时返回
|
|
|
lookup_set_value |
查找 |
在指定的查找表中设置单个值。成功时返回新值,失败时返回
|
|
|
lookup_string_list |
查找 |
在指定的查找表中查找字符串列表值。 此函数目前仅支持 MongoDB查找表 。 |
|
|
lookup_string_list_contains |
布尔 |
查找
|
|
|
lookup_value |
查找 |
查找单个
参见示例 |
|
|
lowercase |
字符串 |
将
|
|
|
machine_asset_lookup |
资产丰富 |
查找单个机器资产。如果多个资产匹配输入参数,则仅返回一个。 |
|
|
machine_asset_update |
资产丰富 |
更新机器资产的IP或MAC地址。如果多个资产匹配输入参数,则仅选择一个。 |
|
|
map_copy |
映射 |
从映射中检索一个值。 |
|
|
map_get |
映射 |
将映射复制到新映射。 |
|
|
map_remove |
映射 |
从映射中移除一个键。 |
|
|
map_set |
映射 |
设置映射中的一个键。 |
|
|
md5 |
字符串 |
生成值的十六进制编码MD5摘要
|
|
|
metric_counter_inc |
调试 |
统计特定指标条件。计数器指标名称
|
|
|
毫秒 |
日期/时间 |
创建一个时间周期,其值为
|
|
|
分钟 |
日期/时间 |
创建一个时间周期,其值为
|
|
|
月份 |
日期/时间 |
创建一个时间周期,其值为
|
|
|
multi_grok |
对字符串应用一组Grok模式并返回首个匹配项。 参见示例 |
|
|
|
murmur3_128 |
编码 |
生成输入值的十六进制MurmurHash3(128位)摘要。
|
|
|
murmur3_32 |
编码 |
生成输入值的十六进制MurmurHash3(32位)摘要。
|
|
|
normalize_fields |
消息处理 |
将所有字段名转为小写以实现标准化。 |
|
|
now |
日期/时间 |
返回当前
另请参阅: is_date |
|
|
otx_lookup_domain |
字符串 |
查询域名在AlienVault OTX威胁情报数据中的信息。需要配置名为
参见示例 |
|
|
otx_lookup_ip |
字符串 |
查询IPv4或IPv6地址在AlienVault OTX威胁情报数据中的信息。需要配置名为
参见示例 |
|
|
parse_cef |
字符串 |
将任何CEF格式的字符串解析为其字段。这是CEF字符串(以
|
|
|
parse_date |
日期/时间 |
使用给定的日期格式解析日期字符串。 |
|
|
parse_json |
字符串 |
解析
另请参阅: to_map |
|
|
parse_unix_milliseconds |
日期/时间 |
尝试将UNIX毫秒时间戳(自1970-01-01T00:00:00.000Z以来的毫秒数)解析为正确的
另请参阅: is_date 参见示例 |
|
|
period |
日期/时间 |
从
|
|
|
正则表达式 |
模式匹配 |
使用Java语法将字符串与正则表达式进行匹配。 |
|
|
regex_replace |
模式匹配 |
将value与pattern中的正则表达式进行匹配,若匹配成功则用
参见示例 |
|
|
remove_asset_categories |
资产增强 |
从资产中移除分类列表。 |
|
|
remove_field (legacyDeprecated) |
消息处理 |
从给定
替代方案参见: remove_single_field , remove_multiple_fields |
|
|
remove_from_stream |
消息处理 |
从指定流中移除
如需彻底丢弃消息,请使用
|
|
|
remove_multiple_fields |
消息处理 |
移除匹配正则表达式模式及/或名称列表的字段,除非字段名被保留。 |
|
|
remove_single_field |
消息处理 |
从消息中移除单个字段,除非字段名被保留。 |
|
|
rename_field |
消息处理 |
修改字段名称
|
|
|
replace |
字符串 |
替换字符串中首次出现的
参见示例 |
|
|
路由到流 |
消息处理 |
将消息的流分配设置为指定流。功能等同于'复制'且不会从当前流移除消息。若未指定
参见示例 |
|
|
秒数 |
日期/时间 |
创建以
|
|
|
选择_json路径 |
映射 |
针对JSON树评估给定的
|
|
|
设置关联资产 |
资产增强 |
添加关联资产信息。 |
|
|
设置字段 |
消息处理 |
将指定的
另请参阅: 设置字段集 |
|
|
设置字段集 |
消息处理 |
将给定的所有名称-值对设置到指定消息的
|
|
|
sha1 |
编码 |
生成该值的十六进制编码SHA1摘要
|
|
|
sha256 |
编码 |
生成该值的十六进制编码SHA256摘要
|
|
|
sha512 |
编码 |
生成该值的十六进制编码SHA512摘要
|
|
|
spamhaus_lookup_ip |
查询 |
将IP地址与Spamhaus DROP和EDROP列表进行匹配。 |
|
|
split |
字符串 |
根据给定模式分割字符串。使用Java语法。 |
|
|
starts_with |
字符串 |
检查
参见示例 |
|
|
string_array_add |
字符串 |
将指定的字符串(或字符串数组)
参见示例 |
|
|
string_entropy |
字符串 |
计算给定字符串中字符分布的香农熵。 |
|
|
substring |
字符串 |
返回
参见示例 |
|
|
swapcase |
字符串 |
交换字符串的大小写
|
|
|
syslog_facility |
转换 |
将
syslog设施编号
在
|
|
|
syslog_level |
转换 |
将
syslog严重性编号
在
|
|
|
threat_intel_lookup_domain |
查找 |
将域名与所有启用的威胁情报源(OTX除外)进行匹配。 |
|
|
threat_intel_lookup_ip |
查找 |
将IP地址与除OTX外所有启用的威胁情报源进行匹配。 |
|
|
to_bool |
转换 |
使用参数的字符串值将其转换为布尔值。 |
|
|
to_date |
转换 |
将
另请参阅: is_date |
|
|
to_double |
转换 |
将第一个参数转换为双精度浮点数值。 |
|
|
to_ip |
转换 |
将给定的
另请参阅: cidr_match |
|
|
to_long |
转换 |
将第一个参数转换为长整型数值。 |
|
|
to_map |
转换 |
将给定的类映射值转换为有效映射。
另请参阅: set_fields , parse_json 参见示例 |
|
|
to_string |
转换 |
将第一个参数转换为其字符串表示形式。 |
|
|
to_url |
转换 |
将给定的
|
|
|
tor_lookup |
查询 |
将IP地址与已知的Tor出口节点进行匹配,以识别来自Tor网络的连接。 |
|
|
流量统计大小 |
消息处理 |
计算整个消息的大小,包括所有额外字段。该值也用于确定消息对许可证使用量的贡献程度。 参见示例 |
|
|
首字母小写 |
字符串 |
将字符串的首字母转换为小写。 |
|
|
大写 |
字符串 |
将字符串转换为大写。区域设置(IETF BCP 47语言标签)默认为
|
|
|
URL解码 |
字符串 |
使用特定的编码方案解码application/x-www-form-urlencoded字符串。 |
|
|
URL编码 |
字符串 |
使用特定的编码方案将字符串转换为application/x-www-form-urlencoded格式。有效的字符集包括,例如,
|
|
|
用户资产查询 |
资产丰富化 |
查询单个用户资产。若多个资产匹配输入参数,仅返回其中一个。 |
|
|
监视列表添加 |
监视列表 |
将值添加到指定类型的监视列表中。成功时返回
|
|
|
监视列表包含 |
监视列表 |
在指定类型的监视列表中查找值。成功时返回
|
|
|
监视列表移除 |
监视列表 |
从指定类型的监视列表中移除值。成功时返回
|
|
|
周 |
日期/时间 |
创建一个时间周期,长度为
|
|
|
whois_lookup_ip |
查询 |
检索IP地址的WHOIS信息 |
|
|
年 |
日期/时间 |
创建一个时间周期,长度为
|
|
示例
|
函数 |
示例 |
|---|---|
|
array_contains |
rule "array_contains"
when
true
then
set_field("contains_number", array_contains([1, 2, 3, 4, 5], 1));
set_field("does_not_contain_number", array_contains([1, 2, 3, 4, 5], 7));
set_field("contains_string", array_contains(["test", "test2"], "test"));
set_field("contains_string_case_insensitive", array_contains(["test", "test2"], "TEST"));
set_field("contains_string_case_sensitive", array_contains(["test", "test2"], "TEST", true));
end
|
|
array_remove |
规则 "array_remove"
当
真
则
设置字段("remove_number", 数组移除([1, 2, 3], 2));
设置字段("remove_string", 数组移除(["one", "two", "three"], "two"));
设置字段("remove_missing", 数组移除([1, 2, 3], 4));
设置字段("remove_only_one", 数组移除([1, 2, 2], 2));
设置字段("remove_all", 数组移除([1, 2, 2], 2, 真));
结束
|
|
连接 |
令 构建消息_0 = 连接(转字符串($消息.协议), " 连接自 ");
令 构建消息_1 = 连接(构建消息_0, 转字符串($消息.源IP));
令 构建消息_2 = 连接(构建消息_1, " 至 ");
令 构建消息_3 = 连接(构建消息_2, 转字符串($消息.目标IP));
令 构建消息_4 = 连接(构建消息_3, " 端口 ");
令 构建消息_5 = 连接(构建消息_4, 转字符串($消息.目标端口));
设置字段("message", 构建消息_5);
|
|
包含 |
包含(转字符串($消息.主机名), "example.org", 真) |
|
调试 |
丢弃来自<来源>的消息"令 调试消息 = 连接("丢弃来自 ", 转字符串($消息.来源));调试(调试消息);`
|
|
丢弃消息 |
规则 "丢弃超过16383字符的消息"
当
有字段("message") 且
正则匹配(模式: "^.{16383,}$", 值: 转字符串($消息.message)).匹配 == 真
则
丢弃消息();
// 添加调试消息以通知被丢弃的消息
调试( 连接("丢弃过大的消息来自 ", 转字符串($消息.来源)));
结束
|
|
以...结尾 |
返回
以...结尾 ( "Foobar Baz Quux" , "quux" , 真 );
返回
以...结尾 ( "Foobar Baz Quux" , "Baz" ); ` |
|
grok存在 |
当
grok存在("USERNAME")
则
令 解析结果 = grok("%{USERNAME:username}", 转字符串($消息.message));
设置字段("parsed_username", 解析结果.username);
结束
|
|
十六进制转十进制字节列表 |
十六进制转十进制字节列表(值: "0x17B90004"); 返回: [23, 185, 0, 4] 十六进制转十进制字节列表(值: "0x117B90004"); 返回: [1, 23, 185, 0, 4] 十六进制转十进制字节列表(值: "17B90004"); 返回: [23, 185, 0, 4] 十六进制转十进制字节列表(值: "117B90004"); 返回: [1, 23, 185, 0, 4] 十六进制转十进制字节列表(值: "not_hex"); 返回:null |
|
is_not_null |
is_null(src_addr) |
|
lookup |
规则 "目标IP地理信息查询"
当
存在字段("dst_ip")
那么
令 geo = 查询("geoip-lookup", 转字符串($消息.dst_ip));
设字段("dst_ip_geolocation", geo["coordinates"]);
设字段("dst_ip_geo_country_code", geo["country"].iso_code);
设字段("dst_ip_geo_country_name", geo["country"].names.en);
设字段("dst_ip_geo_city_name", geo["city"].names.en);
结束
|
|
lookup_all |
规则 "批量查询函数"
当
true
那么
令 values = 批量查询("lut_name", ["key1", "key2", "key3"]);
设字段("values", values);
结束
|
|
lookup_value |
("ip_lookup", 转字符串($消息.src_addr));
|
|
multi_grok |
当
true
那么
设字段集(
字段集: 多模式匹配(
模式: [
"^ABC %{IPORHOST:msg_ip}: %{GREEDYDATA:abc_message}",
"^123 %{IPORHOST:msg_ip}: %{GREEDYDATA:123_message}",
"^ABC2 %{IPORHOST:abc_ip}: %{GREEDYDATA:abc_message}"
],
值: 转字符串($消息.message),
仅命名捕获: true
)
);
结束
|
|
otx_lookup_domain |
规则 "解析IP到DNS"
当
存在字段("source_ip")
且 正则匹配(
模式: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
值: 转字符串($消息.source_ip)
).匹配 == true
那么
令 rs = 查值("dns_lookups", 转字符串($消息.source_ip));
设字段("source_ip_dns", 转字符串(rs));
结束
|
|
otx_lookup_ip |
规则 "解析source_ip - OTX-API-IP"
当
// 验证消息有source_ip字段
存在字段("source_ip")
// 验证源IP为IPv4格式
且 正则匹配(
模式: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
值: 转字符串($消息.source_ip)
).匹配 == true
那么
令 rs = OTX查询IP(转字符串($消息.source_ip));
设字段集(rs);
结束
|
|
parse_unix_milliseconds |
设字段("timestamp", timestamp);
|
|
regex_replace |
令 username = 正则替换(".*user: (.*)", 转字符串($消息.message), "$1");
|
|
replace |
令 new_field = 替换(转字符串($消息.message), "oo", "u"); // "fu ruft uta" 令 new_field = 替换(转字符串($消息.message), "oo", "u", 1); // "fu rooft oota" |
|
route_to_stream |
路由到流(id: "512bad1a535b43bd6f3f5e86"); |
|
starts_with |
返回true: starts_with("Foobar Baz Quux", "foo", true);
返回false: starts_with("Foobar Baz Quux", "Quux");
|
|
string_array_add |
规则 "字符串数组添加"
当
true
那么
设字段("add_number_to_string_array_converted", 字符串数组添加(["1", "2"], 3));
设字段("add_number_array_to_string_array_converted", 字符串数组添加(["1", "2"], [3, 4]));
设字段("add_string", 字符串数组添加(["one", "two"], "three"));
设字段("add_string_again", 字符串数组添加(["one", "two"], "two"));
设字段("add_string_again_unique", 字符串数组添加(["one", "two"], "two", true));
设字段("add_array_to_array", 字符串数组添加(["one", "two"], ["three", "four"]));
结束
|
|
子字符串 |
= 截取字符串(转字符串($消息.消息), 0, 20); |
|
转为映射 |
let json = 解析JSON(转字符串($消息.json负载)); let map = 转为映射(json); 设置字段(map); |
|
流量统计大小 |
设置字段(
字段: "许可证使用量",
值: 流量统计大小() // 单位为字节
//值: 流量统计大小() / 1024 // 单位为千字节
);
|
日志增强
查找表
查找表允许您通过替换消息字段值或创建全新消息字段,来映射、转换或丰富日志数据。例如,您可以使用静态CSV文件将IP地址映射到主机名,或使用外部数据源为消息添加威胁情报、地理位置或资产信息。
此功能可通过内部系统或第三方集成的上下文增强原始日志数据,将其转化为更丰富、可操作的洞察。
组件
查找表系统由四个组件组成:
数据适配器用于实际查找值。它们可能从CSV文件读取、连接数据库或执行请求以获取查找结果。
数据适配器实现是可插拔的,可通过插件添加新适配器。
警告
CSV文件适配器会将整个文件内容读入堆内存。请确保相应调整堆内存大小。
缓存负责缓存查找结果以提高查找性能和/或避免过载数据库和API。它们是独立实体,使得可以为不同数据适配器重用缓存实现。这样,数据适配器无需关心缓存,也无需自行实现。
缓存实现是可插拔的,可通过插件添加新缓存。
提示
如果文件发生更改,CSV文件适配器会在每个检查间隔内刷新其内容。如果缓存被清除但检查间隔未到,查找可能会返回过期值。
查找表组件将数据适配器实例和缓存实例绑定在一起。它需要在转换器、管道函数和装饰器中使用查找表。
查找结果由查找表通过数据适配器返回,可包含两种类型的数据: 单值 和 多值 .
该 单值 可以是字符串、数字或布尔值,将用于转换器、装饰器和管道规则。在我们的CSV示例中查找IP地址对应的主机名时,该值即为主机名字符串。
而
多值
是一种映射或类字典数据结构,可包含多个不同值。当数据适配器能为一个键提供多个值时,此功能非常有用。例如geo-ip数据适配器不仅提供IP地址的经纬度,还包含该地理位置的城市和国家信息。当前多值仅能在使用
lookup()
管道函数时应用于管道规则中。
示例1: 包含单值与多值的CSV数据适配器输出示例。
示例2: 包含单值与多值的geo-ip数据适配器输出示例。
配置流程
您可在 系统 > 查找表 窗口配置查找表。
每个查找表至少需要一个数据适配器和一个缓存。
-
创建数据适配器:
-
前往 系统 > 查找表 > 数据适配器 .
-
选择 创建适配器 并选择数据适配器类型。
-
填写适配器配置表单,其中包含每种类型的内置文档说明。
-
-
创建缓存:
-
前往系统 → 查询表 → 缓存。
-
点击创建缓存并选择缓存类型。
-
填写缓存配置表单。查阅表单中包含的缓存专用文档。
注意
除非在配置时选择忽略空结果,否则空结果会被缓存。
-
-
创建查询表:
-
前往 系统 > 查询表 .
-
选择 创建查询表 .
-
选择数据适配器和缓存实例,并可选择定义默认值。
注意
当查找未返回结果时使用默认值。若在查找表中未找到对应键值, 安全数据湖 将自动返回定义的默认值。
-
创建后,可在提取器、装饰器和管道规则中引用该查找表。
用法
查找表可应用于 安全数据湖 的多个领域以增强数据上下文:
-
转换器 – 在消息摄取期间对提取值执行查找。
-
装饰器 – 在搜索时丰富消息内容而不修改存储数据。
-
管道规则 – 通过
lookup()或lookup_value()函数动态应用逻辑。
内置数据适配器
安全数据湖 预置多种即用型数据适配器。每种类型在 编辑数据适配器 表单中均有屏幕文档说明。
|
适配器 |
描述 |
|---|---|
|
CSV文件适配器 |
从静态CSV文件执行键值查找。 |
|
DNS查询适配器 |
执行主机名与IP解析(A、AAAA、PTR及TXT记录)。 |
|
DSV文件适配器 |
类似于CSV,但支持自定义分隔符和可配置的键/值列。 |
|
HTTPS JSONPath适配器 |
执行GET请求并使用JSONPath表达式提取数据。 |
|
Geo IP – MaxMind |
利用MaxMind数据库提供IP地址的地理定位数据。 |
MongoDB
安全数据湖 新增对MongoDB数据适配器的支持,这些适配器将查询数据直接存储于 安全数据湖 配置数据库中。可通过API、图形界面或管道函数添加、更新或删除条目。
通过API管理MongoDB数据适配器
添加键的curl请求示例:
curl -u <token>:token \
-H 'X-Requested-By: cli' \
-H 'Accept: application/json' \
-H 'Content-Type: application/json' \
-X POST 'http://127.0.0.1:9000/api/plugins/org.graylog.plugins.lookup/lookup/adapters/mongodb/mongodb-data-name' \
--data-binary '{
"键": "myIP",
"值": ["12.34.42.99"],
"数据适配器ID": "5e578606cdda4779dd9f2611"
}'
注意
条目也可直接从 安全数据湖 界面管理,或通过管道规则使用查询相关函数动态修改。
提示
若需在图形界面中为单个键添加多个值,请用换行符分隔每个值。
地理定位
安全数据湖 支持从日志中的IP地址提取并可视化地理定位信息。
本文提供逐步指导,说明如何配置地理定位处理器并利用提取的数据创建地图。
配置处理器
安全数据湖 默认具备地理定位功能,但 仍需额外配置 。本节详细说明如何配置该功能。
注意
需创建账户获取许可证密钥以下载MaxMind数据库。更多信息请访问 MaxMind的博客文章
配置处理器
您需要配置 安全数据湖 以开始使用地理位置数据库解析日志中的IP地址。
-
导航至 系统 > 配置 .
-
选择 插件 > 地理位置处理器 ,然后点击 编辑配置 .
-
勾选 启用地理位置处理器 复选框。
-
从下拉菜单中选择MaxMind或IPInfo。
-
输入您使用的城市数据库和ASN数据库的路径。您还可以调整刷新间隔。
-
选择 更新配置 以保存配置。
Illuminate与地理位置
地理位置配置需在 安全数据湖 开启状态下使用。Illuminate 无需 需使用地理位置数据。
若需在Illuminate内容中获取地理位置数据,必须确保在消息处理器配置中,Illuminate处理器运行于GeoIP解析器之前。请注意该顺序应为默认设置。
检查当前环境配置:
-
进入 系统 > 配置 .
-
选择 消息处理器 ,随后在表格中确认顺序。
如需调整顺序:
-
选择 编辑配置 .
-
通过拖放按需调整列表中项目顺序。
-
选择 更新配置 .
-
强制 安全数据湖 架构选项
配置地理位置处理器时,默认会选中 强制默认架构 选项。若禁用架构强制,所有非保留IP地址的IP字段都将被处理,并自动添加以下前缀字段:
-
_地理位置 -
_国家代码 -
_城市名称
以
源IP
字段为例,生成字段可能显示为:
-
源IP_城市名称: 维也纳 -
source_ip_country_code: AT -
source_ip_geolocation: 48.20849, 16.37208
若启用模式强制,则仅处理以下非保留IP地址的GIM模式字段:
-
destination_ip -
destination_nat_ip -
event_observer_ip -
host_ip -
network_forwarded_ip -
source_ip -
source_nat_ip
以下是为
source_ip
字段生成的示例内容:
-
source_as_number: AS1853 -
source_as_organization: ACONET -
source_geo_city: 维也纳 -
source_geo_coordinates: 48.20849, 16.37208 -
source_geo_country_iso: AT -
source_geo_name: 维也纳, AT -
source_geo_region: 维也纳 -
source_go_timezome: Europe/Vienna
在AWS S3中存储地理定位数据库文件
一个配置选项 从S3存储桶拉取文件 位于配置页面底部,允许从AWS S3存储桶拉取地理定位数据库文件。启用此功能后,可将S3存储桶URL添加到路径配置值中。
当 启用 时,服务会在每个刷新间隔运行,并轮询提供的S3存储桶中的文件。如果这些文件自上次轮询后已更新,则新文件将被拉取到每个节点上。该服务依赖于 默认凭证提供程序 获取S3存储桶的凭证,不使用任何可能在 安全数据湖 AWS插件配置设置中设置的配置值。
从S3获取的地理定位数据库文件存储在
安全数据湖
data_dir
目录下的
geolocation
子目录中。要更改这些文件的下载位置,请在
geo_ip_processor_s3_download_location
中设置磁盘上的所需位置,位于
安全数据湖
服务器配置文件中。
如果保留 禁用 从S3存储桶拉取文件的选项,所有 安全数据湖 节点将从磁盘上的路径读取文件,并需要手动更新这些文件以获取更新。
在地图中可视化地理定位
安全数据湖
可显示存储在任何字段中的地理位置地图,只要地理坐标点采用
纬度,经度
格式。
在搜索结果页面显示地图
在任何搜索结果页面,您都可以在搜索侧边栏展开要用于绘制地图的字段。点击左侧边栏的创建按钮(+),在通用菜单下选择聚合。
这将生成一个空的聚合组件。点击 编辑 并输入信息。选择世界地图作为可视化类型,随后即可看到该字段中存储的所有坐标点构成的地图。
您可以点击 更新预览 查看地图效果,并在点击'更新组件'前进行修改。
注意
添加度量指标会影响地图上圆点的大小。若未定义指标,所有圆点半径相同。
关于 安全数据湖 中涉及不同地理坐标来源的其他字段,请查阅 安全数据湖 模式 .
将地图添加到仪表板
您可以像添加其他组件一样,将地图可视化添加到任意仪表板。在搜索结果页面显示地图时:
-
点击右上角的三点图标
-
选择 导出到仪表板 .
随后可对新仪表板进行重命名、编辑和保存。
数据适配器
ThreatFox威胁指标追踪数据适配器
ThreatFox是 abuse.ch 该项目追踪与恶意软件相关的危害指标(IOCs)。ThreatFox数据适配器支持通过以下关键类型进行查询:
-
URL
-
域名
-
IP:端口
-
MD5哈希值
-
SHA256哈希值
创建数据适配器时,ThreatFox会将数据集下载并存储到MongoDB中。
刷新间隔
配置参数用于确定何时获取新数据集。
示例查询数据
对文件哈希值
923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4
的查询可能产生以下输出:
{
"首次出现时间(UTC)": "2021-07-07T17:03:57.000+0000",
"IOC编号": "158365",
"IOC值": "923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4",
"IOC类型": "sha256哈希",
"威胁类型": "载荷",
"关联恶意软件": "win.agent_tesla",
"恶意软件别名": [
"AgenTesla",
"AgentTesla",
"Negasteal"
],
"恶意软件显示名": "Agent Tesla",
"置信度": 50,
"参考链接": "https://twitter.com/RedBeardIOCs/status/1412819661419433988",
"标签": [
"agenttesla"
],
"匿名提交": false,
"提交者": "Virus_Deck"
}
配置数据适配器
-
标题-
数据适配器的简短标题。
-
-
描述-
数据适配器的描述信息。
-
-
名称-
数据适配器的唯一名称。
-
-
自定义错误TTL-
用于缓存错误结果的自定义TTL(可选)。默认值为5秒。
-
-
包含超过90天的IOCs-
可选设置,包含超过90天的IOCs。默认情况下,数据适配器的数据不包含超过90天的IOCs。为避免误报,请谨慎处理超过90天的IOCs。
-
-
刷新间隔- 决定获取新数据的频率。最小刷新间隔为3600秒(1小时),因为源数据的更新频率就是每小时一次。 -
不区分大小写查询- 允许数据适配器执行不区分大小写的查询。
URLhaus恶意软件URL数据适配器
URLhaus是一个来自
abuse.ch
的项目,负责维护用于恶意软件分发的恶意URL数据库。创建数据适配器时,URLhaus会下载并将相应数据集存储于MongoDB中。
刷新间隔
配置用于设定获取新数据集的时间。
示例查询数据
对URL
https://192.168.100.100:35564/Mozi.m
的查询可能产生如下输出:
{
"single_value": "malware_download",
"multi_value": {
"date_added": "2021-06-22T17:53:07.000+0000",
"url_status": "online",
"threat_type": "malware_download",
"tags": "elf,Mozi",
"url": "http://192.168.100.100:35564/Mozi.m",
"urlhaus_link": "https://urlhaus.abuse.ch/url/1234567/"
},
"string_list_value": null,
"has_error": false,
"ttl": 9223372036854776000
}
配置数据适配器
-
标题-
数据适配器的简短标题。
-
-
描述-
数据适配器的说明。
-
-
名称-
引用数据适配器的唯一名称。
-
-
自定义错误TTL-
用于缓存错误结果的可自定义TTL。若未指定值,则默认为5秒。
-
-
URLhaus数据源类型-
决定数据适配器将使用哪个URLhaus数据源。
-
在线URL是较小数据集,仅包含当前检测为在线的URL。 -
最近添加的URL是较大数据集,包含最近30天内添加的所有在线和离线URL。
-
-
刷新间隔- 决定获取新数据的频率。最小刷新间隔为300秒(5分钟),因为源数据更新频率即为此间隔。 -
不区分大小写查询- 允许数据适配器执行不区分大小写的查询。