配置调度任务告警机器人,如果使用wechat插件,需要企业在微信中生成的秘钥,秘钥只有公司管理员才会生成,鉴于执行太过麻烦,因此直接通过webhook的方式配置机器人。

0X00 开通群组机器

在企业微信的群组中,在右上角的三个小点点中,点击下拉找到“添加群机器人”,生成一条webhook url。下图中左侧是告警的实际效果。
企业微信机器人
企业微信机器人
企业微信机器人告警信息

0X01 编写脚本

参考企微SDK接口,配合大数据调度平台的script alter的能力,编写符合SDK接口数据的格式。下面是一些常见的告警内容,dolphinAlterServer通过-c参数传入脚本,特别说明,DAS传入三个参数,-t 是告警标题,-c 是告警内容,-p 是用户自定义参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// 告警内容格式1:
[
{
"projectCode": 9714355320480,
"projectName": "煤炭交易",
"owner": "admin",
"processId": 458,
"processDefinitionCode": 10046305882272,
"processName": "【CCTD】所有指标调度流-5-20230630023000986",
"processType": "RECOVER_TOLERANCE_FAULT_PROCESS",
"processState": "SUCCESS",
"recovery": "YES",
"runTimes": 2,
"processStartTime": "2023-06-30 02:30:01",
"processEndTime": "2023-06-30 21:12:49",
"processHost": "10.10.1.4:5678"
}
]
// 告警内容格式2:
[
{
"projectCode": 9714355320480,
"projectName": "煤炭交易",
"owner": "admin",
"processId": 11310,
"processDefinitionCode": 10046305882272,
"processName": "【CCTD】所有指标调度流-5-20230703023000443",
"processType": "RECOVER_SERIAL_WAIT",
"processState": "FAILURE",
"runTimes": 1,
"processStartTime": "2023-07-03 02:30:00",
"processHost": "10.10.1.4:5678",
"event": "TIME_OUT",
"warnLevel": "MIDDLE"
}
]

// 告警内容格式3:
[
{
"type": "WORKER",
"host": "/nodes/worker/default/10.10.1.4:1234",
"event": "SERVER_DOWN",
"warningLevel": "SERIOUS"
}
]
// 告警内容格式4:
[
{
"type": "MASTER",
"host": "/nodes/master/10.10.1.5:5678",
"event": "SERVER_DOWN",
"warningLevel": "SERIOUS"
}
]

// 告警内容格式5:
[
{
"projectCode": 9804816236064,
"projectName": "煤炭工商",
"owner": "admin",
"processId": 4065,
"processDefinitionCode": 9804833177504,
"processName": "煤企工商信息-15-20230630092835811",
"taskCode": 9804952245280,
"taskName": "煤企同步到ADS",
"taskType": "DATAX",
"taskState": "FAILURE",
"taskStartTime": "2023-06-30 22:45:21",
"taskEndTime": "2023-06-30 22:47:16",
"taskHost": "10.10.1.5:1234",
"logPath": "/var/log/udp/2.0.0.0/dolphinscheduler/worker-server/logs/20230630/9804833177504_15-4065-11605.log"
}
]

企业支持简单的markdown语法,可以将任务执行的结果、状态、耗时等信息组装为数据包传入钩子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/bin/bash
while getopts "t:c:p:" opts; do
case $opts in
t) t_value=$OPTARG ;;
c) c_value=$OPTARG ;;
p) p_value=$OPTARG ;;
?) ;;
esac
done
echo $c_value" \n" >> /srv/udp/2.0.0.0/dolphinscheduler/alert-server/script/output.log

# 使用 jq 检查是否存在数据对象
dataCount=$(echo "$c_value" | jq length)
if [[ $dataCount -gt 0 ]]; then
# 工作流实例告警
if echo "$c_value" | jq 'map(has("processType")) | any' | grep -q "true"; then
# 使用 jq 提取数组中的值
projectCode=$(echo "$c_value" | jq '.[0].projectCode' | tr -d '"')
projectName=$(echo "$c_value" | jq '.[0].projectName' | tr -d '"')
owner=$(echo "$c_value" | jq '.[0].owner' | tr -d '"')
runTimes=$(echo "$c_value" | jq '.[0].runTimes' | tr -d '"')
processId=$(echo "$c_value" | jq '.[0].processId' | tr -d '"')
processDefinitionCode=$(echo "$c_value" | jq '.[0].processDefinitionCode' | tr -d '"')
processName=$(echo "$c_value" | jq '.[0].processName' | tr -d '"')
processHost=$(echo "$c_value" | jq '.[0].processHost' | tr -d '"')
processState=$(echo "$c_value" | jq '.[0].processState' | tr -d '"')
processStartTime=$(echo "$c_value" | jq '.[0].processStartTime' | tr -d '"')
# 处理结束时间不存在的问题
if echo "$c_value" | jq '. | map(has("'"$key"'")) | contains([false])'; then
endTimeNotExists=true
processEndTime=$(date +"%Y-%m-%d %H:%M:%S")
else
processEndTime=$(echo "$c_value" | jq '.[0].processEndTime' | tr -d '"')
fi

startTimestamp=$(date -d "$processStartTime" +%s)
endTimestamp=$(date -d "$processEndTime" +%s)


duration=$((endTimestamp - startTimestamp))
# 将时长转换为小时、分钟和秒
seconds=$((duration % 60))
minutes=$((duration / 60 % 60))
hours=$((duration / 3600))

# 打印时长
if [[ $hours -eq 0 && $minutes -eq 0 ]]; then
runTime="$seconds 秒"
elif [[ $hours -eq 0 ]]; then
runTime="$minutes 分钟 $seconds 秒"
else
runTime="$hours 小时 $minutes 分钟 $seconds 秒"
fi

if [ "$endTimeNotExists" = "true" ]; then
title="<font color=\\\"warning\\\">工作流执行预警</font>"
else
if [ "${processState}" = "SUCCESS" ]; then
title="<font color=\\\"info\\\">工作流执行预警</font>"
else
title="<font color=\\\"warning\\\">工作流执行预警</font>"
fi
fi
content="**${title}** \\n
>**项目名称**:${projectName}
>**负责人**:${owner}
>**工作流实例**:${processName}
>**工作流Code**:${processDefinitionCode}
>**执行结果**:<font color=\\\"info\\\">${processState}</font>
>**执行次数**:${runTimes}
>**执行节点**:${processHost}
>**开始时间**:${processStartTime}"

if [ -z "$endTimeNotExists" ]; then
content="${content}
>**结束时间**:${processEndTime}"
fi
if echo "$c_value" | jq 'has("event")' | grep -q "true"; then
event=$(echo "$c_value" | jq '.[0].event' | tr -d '"')
content=${content}"
>**事件类型**: ${event}"
fi

if [ "$endTimeNotExists" = "true" ]; then
content=${content}"
>**任务耗时**:<font color=\\\"warning\\\">**${runTime}**</font>"
else
if [ "${processState}" = "SUCCESS" ]; then
content=${content}"
>**任务耗时**:<font color=\\\"warning\\\">**${runTime}**</font>"
else
content=${content}"
>**任务耗时**:<font color=\\\"warning\\\">**${runTime}**</font>"
fi
fi
elif echo "$c_value" | jq 'map(has("type")) | any' | grep -q "true"; then
# [{"type":"MASTER","host":"/nodes/master/10.10.1.5:5678","event":"SERVER_DOWN","warningLevel":"SERIOUS"}]
type=$(echo "$c_value" | jq '.[0].type' | tr -d '"')
host=$(echo "$c_value" | jq '.[0].host' | tr -d '"')
event=$(echo "$c_value" | jq '.[0].event' | tr -d '"')

title="服务节点异常告警"
content="**<font color=\\\"warning\\\">${title}</font>**
>**服务类型**: ${type}
>**主机节点**:${host}
>**告警事件**:${event}"
elif echo "$c_value" | jq 'map(has("taskType")) | any' | grep -q "true"; then
# [{"projectCode":9804816236064,"projectName":"煤炭工商","owner":"admin","processId":14928,
# "processDefinitionCode":10026217228576,"processName":"ods工商信息数据更新-4-20230703093459074",
# "taskCode":10101440248096,"taskName":"删除分区","taskType":"SHELL","taskState":"FAILURE",
# "taskStartTime":"2023-07-03 09:35:02","taskEndTime":"2023-07-03 09:35:04","taskHost":"10.10.1.8:1234",
# "logPath":"/var/log/udp/2.0.0.0/dolphinscheduler/worker-server/logs/20230703/10026217228576_4-14928-11922.log"}]
projectCode=$(echo "$c_value" | jq '.[0].projectCode' | tr -d '"')
projectName=$(echo "$c_value" | jq '.[0].projectName' | tr -d '"')
owner=$(echo "$c_value" | jq '.[0].owner' | tr -d '"')
processId=$(echo "$c_value" | jq '.[0].processId' | tr -d '"')
processDefinitionCode=$(echo "$c_value" | jq '.[0].processDefinitionCode' | tr -d '"')
processName=$(echo "$c_value" | jq '.[0].processName' | tr -d '"')

taskCode=$(echo "$c_value" | jq '.[0].taskCode' | tr -d '"')
taskName=$(echo "$c_value" | jq '.[0].taskName' | tr -d '"')
taskType=$(echo "$c_value" | jq '.[0].taskType' | tr -d '"')
taskState=$(echo "$c_value" | jq '.[0].taskState' | tr -d '"')

taskStartTime=$(echo "$c_value" | jq '.[0].taskStartTime' | tr -d '"')
taskEndTime=$(echo "$c_value" | jq '.[0].taskEndTime' | tr -d '"')
taskHost=$(echo "$c_value" | jq '.[0].taskHost' | tr -d '"')

startTimestamp=$(date -d "$taskStartTime" +%s)
endTimestamp=$(date -d "$taskEndTime" +%s)

duration=$((endTimestamp - startTimestamp))
# 将时长转换为小时、分钟和秒
seconds=$((duration % 60))
minutes=$((duration / 60 % 60))
hours=$((duration / 3600))

# 打印时长
if [[ $hours -eq 0 && $minutes -eq 0 ]]; then
runTime="$seconds 秒"
elif [[ $hours -eq 0 ]]; then
runTime="$minutes 分钟 $seconds 秒"
else
runTime="$hours 小时 $minutes 分钟 $seconds 秒"
fi


if [ "${taskState}" = "FAILURE" ]; then
title="<font color=\\\"warning\\\">任务执行失败告警</font>"
else
title="<font color=\\\"info\\\">任务执行预警</font>"
fi

content="**${title}** \\n
>**项目名称**:${projectName}
>**负责人**:${owner}
>**工作流实例**:${processName}
>**工作流Code**:${processDefinitionCode}
>**任务编码**:${taskCode}
>**任务名称**:${taskName}
>**任务类型**:${taskType}
>**执行结果**:<font color=\\\"info\\\">${taskState}</font>
>**执行节点**:${taskHost}
>**开始时间**:${taskStartTime}
>**结束时间**:${taskEndTime}
>**任务耗时**:${runTime}"

fi

json='{"msgtype": "markdown","markdown": {"content": "'"${content}"'"}}'

curl 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=97d2d7e6-6e2c-439a-a830-82886008f76c' \
-H 'Content-Type: application/json' \
-d "$json" >> /srv/udp/2.0.0.0/dolphinscheduler/alert-server/script/output.log

else
echo "数据对象不存在"
fi

exit 0

0x03 配置告警

将上面写好的脚本保存在/srv/udp/2.0.0.0/dolphinscheduler/alert-server/script/wechat.sh中,文件名随意取。

打开大数据调度平台的安全中心菜单,找到告警实例管理,添加告警组件配置。
DS配置机器人

选择shell脚本,实例名称自定义。

插件选择:script

告警类型:sucess 只有成功才发送,failure 只有失败才发送 all 成功失败均发送消息

自定义参数:无实际意义,一般用于三方接口的鉴权等,可指定参数

脚本路径:AlertServer部署服务器上文件存储的路径,也就是上面保存脚本的路径。切记,脚本备份,如果遇到故障,AlterServer可能会迁移到其他节点。

接下来配置告警组管理:

DS配置机器人

保存后,在项目管理的任务流以及任务流调度中,可以配置告警组。

DS配置机器人

一切就绪,上面第一张截图就是执行效果了!