Link Search Menu Expand Document

任务

目录

  1. 下采样任务
    1. 在 InfluxDB UI 中创建下采样任务
    2. 使用 InfluxDB v2 API 创建下采样任务
    3. 使用 CLI 创建下采样任务
  2. 告警任务
    1. 导入包并配置任务选项
    2. 配置 Slack 端点
    3. 创建一个自定义函数来处理条件查询逻辑
    4. 查询我们的数据
  3. 使用任务将数据从 OSS 写入云端
    1. 使用 to() 函数将数据从 OSS 整合到 InfluxDB Cloud。
    2. to() 函数的限制和替代方案。
    3. 在下采样任务中可接受的 to() 用法
  4. 理解 _task 桶
  5. 使用 UI 调试和管理任务
  6. 使用 InfluxDB API v2 调试和管理任务
  7. 使用 CLI 调试和管理任务
  8. 延伸阅读

任务是按用户定义的计划运行的 Flux 脚本。任务用于各种功能,包括

  • 将高分辨率数据下采样为低分辨率聚合
  • 对数据应用其他数据转换
  • 向通知端点发送通知
  • 将数据从 OSS 写入云端
  • 使用 http.get() 函数持续写入

由于任务主要用于转换持续写入的数据,因此在本节中,我们假设您已按照上述 telegraf 部分中的说明配置了 CPU 输入插件。

下采样任务

最常见的任务类型之一是下采样任务。下采样任务用于将高分辨率数据转换为低分辨率聚合。下采样任务是有效的时间序列数据库管理的关键部分,因为它们允许您

  • 通过仅保留短时间的高分辨率数据来最大限度地减少总体磁盘使用量
  • 使您能够捕获历史数据的趋势

任务是按计划执行的 Flux 脚本。您可以使用任务执行许多其他功能。您可以使用 InfluxDB UIInfluxDB v2 APICLIVS Code 创建按计划执行此下采样的任务。

在 InfluxDB UI 中创建下采样任务

在 InfluxDB UI 中创建下采样任务有两种方法

  1. 从**数据浏览器**
  2. 从**任务**页面

在本节中,我们将介绍第一种方法,因为我认为它提供了最自然的工作流程。使用**数据浏览器**试验数据的不同物化视图。

data explorer

在上面的屏幕截图中,我们正在查询 CPU 数据,并应用 aggregateWindow() 函数来计算每分钟的平均值。我们还使用 toInt() 函数转换数据。

save query as task

对要应用的聚合或 Flux 转换感到满意后,选择**保存**按钮。通过选择**另存为**窗口中的**任务**选项卡将查询保存为任务。命名您的任务并指定任务的**每**选项。这将指定您希望运行任务的频率。还要包括一个**偏移量**以避免任何读写冲突。偏移量配置选项会延迟任务的执行,但会根据每个选项查询数据。查看文档以获取完整的任务配置选项列表

使用 InfluxDB v2 API 创建下采样任务

要使用 InfluxDB v2 API 创建下采样任务,您必须向 /api/v2/tasks 端点提交 POST 请求。包括您的令牌、orgID、任务的 Flux 以及任务的状态。活动任务将按每个间隔执行任务。由于请求的正文是 JSON,请确保转义 Flux 中的任何双引号。

curl --location --request POST \
'https://us-west-2-1.aws.cloud2.influxdata.com/api/v2/tasks' \
--header 'Authorization: Token XXX==' \
--header 'Content-Type: application/json' \
 --data-raw '{"orgID": "0437f6d51b579000", 
 "status": "inactive", 
 "flux": "option task = {name: \"Downsampling and transformation\", every: 15m, offset: 1m}\n from(bucket: \"system\")\n |> range(start: -task.every)\n |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n |> filter(fn: (r) => r[\"_field\"] == \"usage_user\")\n |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\")\n |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)\n |> to(bucket: \"downsample\")\n", 
 "description": "downsamples total cpu usage_user data into 1m averages every 15m"
}'

如果您的请求成功,它将产生以下结果

 {"links":{"labels":"/api/v2/tasks/0891b50164bf6000/labels",
           "logs":"/api/v2/tasks/0891b50164bf6000/logs",
           "members":"/api/v2/tasks/0891b50164bf6000/members",
           "owners":"/api/v2/tasks/0891b50164bf6000/owners",
           "runs":"/api/v2/tasks/0891b50164bf6000/runs","self":"/api/v2/tasks/0891b50164bf6000"},
           "labels":[],
           "id":"0891b50164bf6000",
           "orgID":"XXX",
           "org":"[email protected]",
           "ownerID":"XXX",
           "name":"mytask",
           "description":"downsamples total cpu usage_user data into 5m averages every 10m",
           "status":"active",
           "flux":"option task = {name: \"mytask\", every: 10m, offset: 1m}\n from(bucket: \"system\")\n |\u003e range(start: -task.every)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"cpu\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"usage_user\")\n |\u003e filter(fn: (r) =\u003e r[\"cpu\"] == \"cpu-total\")\n |\u003e aggregateWindow(every: 5m, fn: mean, createEmpty: false)\n |\u003e to(bucket: \"downsampled\")\n",
           "every":"10m",
           "offset":"1m",
           "latestCompleted":"2021-12-07T22:10:00Z",
           "lastRunStatus":"success",
           "createdAt":"2021-12-07T21:39:48Z",
           "updatedAt":"2021-12-07T22:20:16Z"}

请注意,结果运行任务 ID。这将在以后调试任务时很有用。

使用 CLI 创建下采样任务

要使用 CLI 创建任务,请使用 influx task create 命令并提供 Flux 脚本的位置。

influx task create --file /path/to/example-task.flux

告警任务

在下一节中,我们将学习有关 InfluxDB 的检查和通知系统的所有知识,该系统允许您根据时间序列数据创建自定义告警。但是,您不必使用检查和通知系统来发送告警。相反,如果您愿意,您可以选择完全绕过系统并使用任务发送告警。这不推荐做法。使用任务完全绕过检查和通知系统会导致以下缺点

  • 没有关于您的告警的可见性或元数据
  • 缺乏结构化的告警管道

但是,了解这种类型的任务有助于理解检查和通知系统。此外,这种方法可能足以满足不依赖于告警可见性的爱好者。

对于此示例,假设我们想根据在指定时间段内发生的事件数量创建告警。如果事件数量超过阈值,那么我们希望收到 Slack 通知。对于此任务,假设当某些事情发生时,我们将数据推送到 InfluxDB。换句话说,我们正在写入事件数据而不是指标。

导入包并配置任务选项

第一步是像往常一样导入必要的包并配置任务选项。

import "array"
import "slack"

option task = { name: "Event Alert", every: 1h0m0s, offset: 5m0s }

配置 Slack 端点

我们将使用 slack.message() 函数将通知写入我们的 Slack 端点。要查找您的 Slack 传入 Webhook URL,请访问 https://api.slack.com/apps。然后**创建新应用**。

slack api

指定您要赋予应用的应用名称以及您要使用的开发 Slack 工作区。

create a slack app

接下来,在**传入 Webhook** 页面上使用切换开关**激活传入 Webhook**。

incoming webhook

现在您已准备好向**工作区**添加**新 Webhook**。

webhook urls

此时,Slack 将请求访问您的 Slack 工作区的权限,并要求您选择要发布到的频道。

permission to access slack

现在,您应该能够在 Slack 频道中看到此权限访问和 Webhook 集成。

added notification to channel

最后,您为您的工作区生成了一个 Webhook URL。您将在使用 slack.message() 函数时(或稍后在“检查和通知”部分中介绍的通过 InfluxDB UI 创建通知端点时)使用此 URL 来接收通知。

webhook for your workspace

现在您已经拥有了工作区的 Slack webhook URL,我们就可以在 Flux 任务中使用 slack.message() 函数将告警发送到 Slack 工作区了。 slack.message() 函数需要 4 个参数

  1. url:这是 Slack API URL 或 Slack Webhook,即我们在上一节中收集的 URL。
  2. channel:您要将消息发布到的 Slack 频道。如果您只想向自己发送通知,则无需提供任何值。
  3. token:如果您使用的是 Slack API URL,则所需的 Slack API 令牌。由于我们使用的是 Slack Webhook,因此您无需提供任何值。
  4. text:您要发送到 Slack 的消息。我们可以通过插入值并使用 ${} 表示法将它们作为字符串返回来在消息中包含任何浮点数、整数或时间戳。

例如,如果我们想测试我们的 Slack webhook,请使用以下 Flux

import "slack"
numericalValue = 42

slack.message(
  url: "https://hooks.slack.com/services/####/####/####",
  text: "This is a message from the Flux slack.message() function the value is ${numericalValue}.",
)

创建自定义函数来处理条件查询逻辑

接下来,我们将创建一个自定义函数,如果事件数量超过我们的阈值,则发送 Slack 消息。我们必须创建一个自定义函数来封装条件查询逻辑,因为您不能在 Flux 的函数外部包含条件逻辑。

alert = (eventValue, threshold) =>
   (if eventValue >= threshold then slack.message(
       url: "https://hooks.slack.com/services/####/####/####",
       text: "An alert event has occurred! The number of field values= \"${string(v: eventValue)}\".",
       color: "warning",
   ) else 0)

我们的自定义函数称为 alert。它接受两个参数

  1. eventValue:根据 task.every 选项配置的最后一小时内发生的事件数量。
  2. threshold:最后一小时内可接受事件数量的阈值。

我们使用条件逻辑来指定,如果事件数量超过阈值,则应调用 slack.message() 函数并向 Slack 发送通知消息。然后,我们调用 slack.message() 函数,并将 Webhook URL 和通知消息分别传递给 urltext 参数。我们插入自定义函数的 eventValue 参数并将其包含在消息中。

查询数据

最后,我们必须查询数据以计算自上次任务运行以来发生的事件数量。我们将结果存储在变量 data 中。我们过滤数据并使用 sum() 函数查找 task.every 持续时间内的事件总数。

data = from(bucket: "bucket1")  
   |> range(start: -task.every, stop: now())
   |> filter(fn: (r) =>
       (r._measurement == "measurement1" and r._field == "field1" and exists r._value))
   |> sum()

我们还必须构建一个虚拟表 data_0,以处理过去一小时内未发生任何事件的情况,以便我们可以成功地将值传递给自定义 alert() 函数。我们使用 array.from() 函数构建具有单个 _value 列和单个值 0 的表。

data_0 = array.from(rows: [{_value: 0}])

接下来,我们将两个表合并在一起,将它们分组在一起,并将两个表的值加在一起。由于 data_0 表的值为 0,因此 events 表的 _value 列的值将为

  • 0,当没有事件发生或 data 表未返回结果时
  • 0 + n,当发生 n 个事件或 data 表返回结果时

我们使用 findRecord() 函数从 _value 列中提取事件数量。我们必须 yield 我们的虚拟表,因为 Flux 任务有一个要求:您必须调用流数据。如果您希望 Flux 团队优先取消此要求,请对此 issue #3726 发表评论。

events = union(tables: [data_0, data])
   |> group()
   |> sum()
   |> findRecord(fn: (key) =>
       (true), idx: 0)
eventTotal = events._value

data_0
   |> yield(name: "ignore")
alert(eventValue: eventTotal, threshold: 1)

最后,我们可以调用自定义 alert() 函数。我们将 eventTotal 值作为 eventValue 传入,并手动指定阈值。我们的 Flux 代码如下所示

import "array"
import "slack"

option task = { name: "Event Alert", every: 1h0m0s, offset: 5m0s }

alert = (eventValue, threshold) =>
   (if eventValue >= threshold then slack.message(
       url: "https://hooks.slack.com/services/####/####/####",
       text: "An alert event has occurred! The number of field values= \"${string(v: eventValue)}\".",
       color: "warning",
   ) else 0)
data = from(bucket: "bucket1")  
   |> range(start: -task.every, stop: now())
   |> filter(fn: (r) =>
       (r._measurement == "measurement1" and r._field == "field1" and exists r._value))
   |> sum()
data_0 = array.from(rows: [{_value: 0}])
events = union(tables: [data_0, data])
   |> group()
   |> sum()
   |> findRecord(fn: (key) =>
       (true), idx: 0)
eventTotal = events._value

data_0
   |> yield(name: "ignore")
alert(eventValue: eventTotal, threshold: 1)

使用任务将数据从 OSS 写入云端

在将数据写入 InfluxDB 时,您有很多选择。您可以

  • 将数据写入边缘或雾设备上的 OSS 实例
  • 将数据从您的 IoT 设备直接写入 InfluxDB Cloud
  • 将数据写入边缘设备上的 InfluxDB OSS 实例,并临时将数据推送到 InfluxDB Cloud。

最后一个选项是维护和管理 IoT 设备群最强大、最灵活的方式。该架构为您提供多种优势,包括

  • 能够在将数据发送到 InfluxDB Cloud 之前执行任务来准备和清理数据。这不仅可以帮助您创建一个组织良好的数据管道,还可以确保您的数据在各个 IoT 设备上都已标准化。
  • 灵活地仅将选定的数据发送到 InfluxDB Cloud,例如在 OSS 实例中标记的异常。
  • 可以选择隔离任务工作负载并使其靠近数据源。
  • 在边缘设备和 InfluxDB Cloud 实例以及最终客户之间增加一层安全性的好处。

architecture drawing

最后一个选项的架构图。传感器将数据写入边缘的 InfluxDB OSS 实例,该实例又将数据写入 InfluxDB Cloud。

使用 to() 函数将数据从 OSS 整合到 InfluxDB Cloud。

实现此架构的一种方法是使用 to() 函数。to() 函数现在支持将数据从 OSS v2.1 实例写入 InfluxDB Cloud 实例的选项。

oss to cloud 使用 to() 函数将数据从 OSS 实例写入 InfluxDB Cloud 的示例。在右侧,用户正在查询 OSS 实例。在右侧,InfluxDB Cloud 实例显示了数据使用 to() 函数写入 InfluxDB Cloud 后 OSS 实例中 Flux 脚本的结果。

要使用 to() 函数从 InfluxDB Cloud 写入数据

  1. 查询您的数据。
from(bucket: "RoomSensors")
    |> range(start: -10m)
    |> filter(fn: (r) => r["_measurement"] == "temperature")
    |> filter(fn: (r) => r["_field"] == "temperature")
    |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
  1. 指定目标主机 URL、orgID、令牌和存储桶 ID。
|> to(host: https://us-west-2-1.aws.cloud2.influxdata.com, orgID: XXX,token: XXX==, bucketID: XXX)

您需要先拥有一个 InfluxDB Cloud 帐户才能试用此功能。免费套餐帐户即可。您可以在此处轻松注册帐户。注册后,您需要

to() 函数的限制和替代方案。

to() 函数提供了一种将数据从边缘设备写入云端的简单方法。但是,它有一些限制

  • 通过 HTTP 发送
  • 没有内置的故障处理功能
  • 没有内置的批处理、重试或并行化功能。

仅当您满足以下条件时,才应使用 to() 函数将数据从 OSS 整合到云端

  • 您打算在将数据写入云端之前先对其进行下采样以限制请求的大小。
  • 您拥有的设备数量很少,或者以这种方式写入的数据量相对较小。您不能将 to() 函数用于大型工作负载,因为数据量可能会导致网络故障。也没有内置的故障处理功能。
  • 您并未尝试通过微批处理和生成非常高的请求计数来克服写入大量数据的问题。

虽然 to() 函数有一些限制,但还有另一种将 IoT 数据从 OSS 写入 InfluxDB Cloud 的选项。如果您想处理更大的工作负载,可以使用 mqtt.to() 函数先将数据写入 MQTT 代理。

to() 在下采样任务中的可接受用法

现在我们了解了 to() 函数的局限性。让我们重点介绍一个它可以成功的示例。在本例中,我们假设我们要使用任务按计划对数据进行下采样和写入。

option task = {
    name: "downsample and consolidate task",
    every: 1d,
    offset: 10m,
}
import "http"
import "influxdata/influxdb/secrets"


myToken = "Token ${secrets.get(key: "cloud-token")}"
myUrl= "https://us-west-2-1.aws.cloud2.influxdata.com"
myOrgID = "Token ${secrets.get(key: "cloud-org")}"
myBucketID = "Token ${secrets.get(key: "cloud-bucket")}"


from(bucket: "sensorData")
|> range(start: -task.every)
|> filter(fn: (r) =>
(r["_measurement"] == "sensors"))
|> mean()
|> to(host: myurl, orgID:   myOrgID, token: myToken, bucketID: myBucketID)

此任务将一个测量值中的所有数据聚合到一个每日平均值中,然后再将其写入 InfluxDB Cloud。

了解 _task 存储桶

每次运行任务时,有关任务的元数据都会存储在默认的 _task 存储桶中。它包含以下架构

  • 1 个测量值:runs
  • 2 个标签,具有以下标签值
    • status
      • successful
      • unsuccessful
    • taskID
      • 您创建的所有任务的任务 ID 列表
  • 9 个字段值
    • errorMessage
    • finishedAt
    • lastSuccess
    • logs
    • name
    • requestedAt
    • runID
    • shceduledFor
    • startedAt

此数据可以帮助您调试和管理任务。我们将在后续部分中详细介绍如何使用此存储桶中的数据。

使用 UI 调试和管理任务

要使用 UI 调试任务,请导航到“任务”页面,然后单击您的任务名称,例如**下采样和转换**任务。

view task

在这里,您可以看到已计划的任务运行列表。您可以确定任务是否成功运行、运行何时开始以及运行执行所需的时间。您还可以选择立即**运行任务**。这在调试任务时特别有用,因为您不想等到下次计划运行任务时才验证您的更改是否纠正了问题。您希望能够立即测试您的任务是否成功运行。最后,您可以使用任务名称正下方的**切换**将任务状态从活动更改为非活动,以停止任务运行。

downsample task

您还可以选择特定运行的**查看日志**以查看该运行的详细日志。任何错误消息都将显示在此处,这可以帮助您调试任务。

task logs

如果您关闭**运行日志**窗口并返回到您的任务,请单击**编辑任务**以查看从使用 InfluxDB UI 创建任务时自动生成的 Flux。您还可以在此处对底层 Flux 进行编辑。

edit task

最后,您可以查询 _task 存储桶以返回有关任务的元数据,以帮助您调试任务。例如,通过识别上次成功运行的任务,您可以更轻松地查明导致任务失败的系统更改。

task bucket

使用 InfluxDB API v2 调试和管理任务

您可以使用 InfluxDB API v2 执行各种任务操作

使用以下命令列出所有任务

curl --location --request GET 'http://us-west-2-1.aws.cloud2.influxdata.com/api/v2/tasks?org [email protected]' \
--header 'Authorization: Token XXX=='

此请求产生以下响应

 {"links":{"labels":"/api/v2/tasks/0891b50164bf6000/labels",
           "logs":"/api/v2/tasks/0891b50164bf6000/logs",
           "members":"/api/v2/tasks/0891b50164bf6000/members",
           "owners":"/api/v2/tasks/0891b50164bf6000/owners",
           "runs":"/api/v2/tasks/0891b50164bf6000/runs","self":"/api/v2/tasks/0891b50164bf6000"},
           "labels":[],
           "id":"0891b50164bf6000",
           "orgID":"XXX",
           "org":"[email protected]",
           "ownerID":"XXX",
           "name":"mytask",
           "description":"downsamples total cpu usage_user data into 5m averages every 10m",
           "status":"active",
           "flux":"option task = {name: \"mytask\", every: 10m, offset: 1m}\n from(bucket: \"system\")\n |\u003e range(start: -task.every)\n |\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"cpu\")\n |\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"usage_user\")\n |\u003e filter(fn: (r) =\u003e r[\"cpu\"] == \"cpu-total\")\n |\u003e aggregateWindow(every: 5m, fn: mean, createEmpty: false)\n |\u003e to(bucket: \"downsampled\")\n",
           "every":"10m",
           "offset":"1m",
           "latestCompleted":"2021-12-07T22:10:00Z",
           "lastRunStatus":"success",
           "createdAt":"2021-12-07T21:39:48Z",
           "updatedAt":"2021-12-07T22:20:16Z"}

您还可以通过在 URL 中提供任务 ID 来检索特定任务,使用

curl --location --request GET 'http://us-west-2-1.aws.cloud2.influxdata.com/api/v2/tasks/0891b50164bf6000' \
--header 'Authorization: Token XXX=='

在这种情况下,请求将返回与之前列出所有任务的请求相同的响应,因为我们只有一个任务。

您可以使用 InfluxDB API v2 编辑任务,类似于创建任务的方式。这还允许您更改任务的状态。

​​curl --location --request PATCH 'http://us-west-2-1.aws.cloud2.influxdata.com/api/v2/tasks/0891b50164bf6000' \
--header 'Authorization: Token XXX==' \
--header 'Content-Type: application/json' \
--data-raw '{"description": "updated my task", 
"every": "10m", 
"flux": "<new flux script with task options included>", 
"name": "new name",
"offset": "1m", 
"status": "inactive"}'

使用 CLI 调试和管理任务

使用 influx task list 命令列出所有任务及其对应的名称、任务 ID、组织、状态、Every、Cron 值。

例如,运行 influx task list 将返回以下内容

ID			Name	Organization                        ID		            Organization		    Status		Every	Cron
0891b50164bf6000 	Downsampled and transformed			0437f6d51b579000	[email protected]	inactive	15m	

使用 influx task log list 命令通过指定运行 ID 列出特定运行的日志。获取运行 ID 的最简单方法是执行以下查询

influx task log list --task-id 0891b50164bf6000 --run-id 0891b54779033002

此命令产生以下输出

RunID			    Time					                Message
0891b54779033002	2021-12-07 21:41:00.055420881 +0000 UTC	Started task from script: "option task = {name: \"mytask\", every: 10m, offset: 1m}\n from(bucket: \"system\")\n |> range(start: -task.every)\n |> filter(fn: (r) => r[\"_measurement\"] == \"cpu\")\n |> filter(fn: (r) => r[\"_field\"] == \"usage_user\")\n |> filter(fn: (r) => r[\"cpu\"] == \"cpu-total\")\n |> aggregateWindow(every: 5m, fn: mean, createEmpty: false)\n |> to(bucket: \"downsampled\")\n"
0891b54779033002	2021-12-07 21:41:00.33920522 +0000 UTC	trace_id=24cfcf9895f08ea9 is_sampled=false
0891b54779033002	2021-12-07 21:41:00.671075325 +0000 UTC	Completed(success)

您可以通过查询 _task 存储桶来获取运行 ID。

from(bucket: "_tasks")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "runs")
|> filter(fn: (r) => r["_field"] == "runID")
|> filter(fn: (r) => r["status"] == "success")
|> filter(fn: (r) => r["taskID"] == "0891b50164bf6000")
|> yield(name: "runID")

现在,使用 influx task update 命令并传入新 Flux 脚本的文件位置来更新任务。

influx task update --id 0891b50164bf6000 --file /path/to/example-task.flux

使用相同的命令,但使用 –status 标志来激活或停用任务

influx task update --id 0891b50164bf6000 --status inactive

下一节

进一步阅读

  1. TL;DR InfluxDB 技术提示:使用 InfluxDB 调试和监控任务
  2. TL;DR InfluxDB 技术提示:使用 Flux 将 IoT 数据从边缘传输到云端
  3. TL;DR InfluxDB 技术提示 – 监控任务并查找失控基数的来源
  4. 使用 InfluxDB v2.0 进行下采样
  5. TL;DR InfluxDB 技术提示 – 如何使用 InfluxDB 监控状态
  6. 太长不看:InfluxDB 技术技巧 – 使用任务和检查进行监控