日志聚合工具中的后起之秀-GrayLog,是一个开源的日志聚合、分析、审计、展现和预警工具。在功能上来说,和 ELK 类似,但又比 ELK 要简单很多。依靠着更加简洁,高效,部署使用简单的优势很快受到许多人的青睐。

本文不涉及graylog的工作流程以及架构设计的介绍,只介绍其三个查询数据的接口。

接口介绍

查询结束时间点的日志

api/load_logs_with_time接口查询当前时间最新的日志(如果查询条件中给定了end_time,最新的日志则是end_time之前的400条数据)。该接口时GET方式,写脚本的时候需要将将查询参数经过URLENCODE编码才可以使用。

三个参数:

  • start_time: 开始时间,非必须
  • end_time: 结束时间,非必须
  • query: 查询条件,非必须,查询条件支持字段A:值1 AND 字段B:值2的方式表示查询字段A包含值1并且字段B包含值2,其中AND也可以用OR,表示或。字段A和字段B也可以相同。举个例子:app:demo AND message:“测试” AND message:“youzan”,表示查询应用为deme,并且日志消息包含测试以及youzan。

向前查询日志

api/load_older_logs接口是向前查询数据,从什么时候开始向前,这个参数用end_time限定,end_time一般是load_logs_with_time接口的第一调用之后返回的第一条数据的时间戳,也就是load_logs_with_time接口的起始时间戳。不过,需要注意的是,可能返回的时间戳是UTC时间,比北京时间晚8个小时。

该接口是POST方法。

查询最新日志

api/load_newer_logs接口是向后查询最新的数据,查询方法和api/load_older_logs相似。

自动化脚本

脚本的作用是检索请求接口的耗时,我们的日志格式类似这样:[[Antifraud End][NalsGleaner service]]: partnerCode=qianshou,eventId=Post_web_20220118,seqId=1650927537304981F81A11AF86014604, cost=18ms。我们通过检索关键词“Antifraud End”以及合作方的名称,就可以查询到具体某个合作方每笔调用的耗时。

如果有其他的格式,只需要重写analysis分析日志的方法即可实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import requests
import time
from datetime import datetime, timedelta
from urllib import parse
import json


class GrayLog(object):
'''
graylog日志分析
'''

def __init__(self, cookie, auth):
self.cookie = cookie
self.auth = auth
self.first_url = "http://graylog.proxy.xxx.cn/api/load_logs_with_time"
self.load_url = "http://graylog.proxy.xxx.cn/api/load_newer_logs"
self.load_old_url = "https://graylog.proxy.xxx.cn/api/load_older_logs"
self.headers = {
'Cookie': self.cookie,
'Authorization': self.auth,
'Referer': 'http://graylog.proxy.xxx.cn/',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36'
}
self.excludes = []
self.data = {}
self.to = None
self.froms = None

"""
抓取首页数据首页
"""

def currentPage(self, app=None, query=None, start_time=None, end_time=None):
payload = {}
url = self.first_url + "?"
qry = ""
if app and len(app) != 0:
qry = "app:\"" + app + "\""
if query and len(query) != 0:
qry = qry + " AND " + query
if len(qry) != 0:
url = url + "query=" + parse.quote(qry)
if start_time and len(start_time) != 0:
start = int(time.mktime(time.strptime(
start_time, "%Y-%m-%d %H:%M:%S"))) * 1000
print(start)
url = url + "&start_time=" + str(start)
if end_time and len(end_time) != 0:
end = int(time.mktime(time.strptime(
end_time, "%Y-%m-%d %H:%M:%S"))) * 1000
print(end)
url = url + "&end_time=" + str(end)

url = url + "&size=1000"
response = requests.request(
"GET", url, headers=self.headers, data=payload)
jsonobj = json.loads(response.text)

isFirsrtMessage = False
for message in jsonobj['messages']:
msg = message['message']
s_id = msg['_id']
self.excludes.append(s_id)
msg_str = msg['message']
if not isFirsrtMessage:
self.to = msg['timestamp']
isFirsrtMessage = True
self.froms = msg['timestamp']
#
self.analysis(msg_str)

"""
分析日志
"""

def analysis(self, msg):
if msg and len(msg) != 0:
# print(msg)
messages = msg.split(",")
event_id = ""
cost = 0
for message in messages:
sp = message.split("=")
if len(sp) != 2:
continue
if sp[0].strip(" ") == 'eventId':
event_id = sp[1]
if sp[0].strip(" ") == 'cost':
cost = int(str(sp[1].strip("ms")))
if len(event_id) > 0:
if self.data.__contains__(event_id):
e = self.data[event_id]
else:
e = []
e.append(cost)
self.data[event_id] = e

"""
@desc: 向前加载日志,graylog默认加载的是满足条件的最新的日志
因此需要向前加载
@param: is_need_load_older_logs 是否加载最早的日志
@param: start_time 开始时间
"""

def loadOlderLogs(self, is_need_load_older_logs, query, start_time):
if not is_need_load_older_logs or not self.to:
return
start = int(time.mktime(time.strptime(
end_time, "%Y-%m-%d %H:%M:%S"))) * 1000
# 2022-04-28T20:50:25.000Z
localtime = datetime.strptime(
self.to, "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(hours=8)
end = int(time.mktime(localtime.timetuple()) * 1000)
while start <= end:
payload = {
"query": query,
"exclude": "".join(self.excludes),
"end_time": end
}
response = requests.request(
"POST", self.load_old_url, headers=self.headers, data=payload)
jsonobj = json.loads(response.text)
if jsonobj.__contains__("total_results"):
result = int(jsonobj['total_results'])
if result == 0:
break
isFirsrtMessage = False
for message in jsonobj['messages']:
msg = message['message']
s_id = msg['_id']
self.excludes.append(s_id)
msg_str = msg['message']
if not isFirsrtMessage:
self.to = msg['timestamp']
isFirsrtMessage = True
localtime = datetime.strptime(
self.to, "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(hours=8)
end = int(time.mktime(localtime.timetuple()) * 1000)
self.froms = msg['timestamp']
#
self.analysis(msg_str)

"""
@desc: 向后加载日志
@param: is_need_load_newer_logs 是否加载最新日志
@param: end_time 结束时间
"""

def loadNewerLogs(self, is_need_load_newer_logs, query, end_time):
if not is_need_load_newer_logs or not self.froms:
return
end = int(time.mktime(time.strptime(
end_time, "%Y-%m-%d %H:%M:%S"))) * 1000
# 2022-04-28T20:50:25.000Z
start = int(time.mktime((datetime.strptime(
self.froms, "%Y-%m-%dT%H:%M:%S.%fZ") + timedelta(hours=8)).timetuple()) * 1000)

print(start)
print(end)
while end >= start:
payload = {
"query": query,
"exclude": "".join(self.excludes),
"start_time": start
}
response = requests.request(
"POST", self.load_url, headers=self.headers, data=payload)
jsonobj = json.loads(response.text)
if jsonobj.__contains__("total_results"):
result = int(jsonobj['total_results'])
if result == 0:
break
# print(response.text)
break


if __name__ == '__main__':
cookie = "你的cookie"
auth = "Basic anVud2VuLnpob3U6cmVwb332QHBhRA=="
start_time = "2022-04-29 02:10:57"
end_time = "2022-04-29 06:10:25"
query = "message:\"Antifraud End\" AND message:\"qianshou\""
is_need_load_newer_logs = True
is_need_load_older_logs = True
app = "你的应用名"

graylog = GrayLog(cookie, auth)
graylog.currentPage(app, query, start_time, end_time)
graylog.loadOlderLogs(is_need_load_newer_logs, query, start_time)
#graylog.loadNewerLogs(is_need_load_newer_logs, query, end_time)
print(json.dumps(graylog.data))