79 views

操作ES集群之Python Elasticsearch API

By | 2019年2月28日

今天有使用我们公司高防TTCDN的客户有提到过他们技术部遇到这么一个难题,怎么用Python Elasticsearch API操作ES集群,我们技术部的小张特意整理了下相关资料,给大家也分享分享:

一、环境配置
1)Centos 7.4
2)Python 2.7
3)Pip 2.7 MySQL-python 1.2.5 Elasticsearc 6.3.1
4)Elasitcsearch6.3.2

1.废话就不多说了,直接上菜:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#minyt 2018.9.1
#获取24小时内出现的模块次数
# 该程序通过elasticsearch python client 获取相关精简数据,可以计算请求数、超时数、错误数、正确率、错误率等等
import MySQLdb
from elasticsearch import Elasticsearch
from elasticsearch import helpers

#定义elasticsearch集群索引名
index_name = “logstash-nginxlog-*”

#实例化Elasticsearch类,并设置超时间为180秒,默认是10秒的,如果数据量很大,时间设置更长一些
es = Elasticsearch([‘elasticsearch01′,’elasticsearch02′,’elasticsearch03′],timeout=180)

#DSL(领域特定语言)查询语法,查询top50 sname的排列次数
data_sname = {
“aggs”: {
“2”: {
“terms”: {
“field”: “apistatus.sname.keyword”,
“size”: 100,
“order”: {
“_count”: “desc”
}
}
}
},
“size”: 0,
“_source”: {
“excludes”: []
},
“stored_fields”: [
“*”
],
“script_fields”: {},
“docvalue_fields”: [
“@timestamp”
],
“query”: {
“bool”: {
“must”: [
{
“match_all”: {}
},
{
“range”: {
“@timestamp”: {
“gte” : “now-24h/h”,
“lt” : “now/h”
}
}
}
],
“filter”: [],
“should”: [],
“must_not”: []
}
}
}

#按照DSL(特定领域语言)语法查询获取数据
def get_original_data():
try:
#根据上面条件搜索数据
res = es.search(
index=index_name,
size=0,
body=data_sname
)
return res

except:
print “get original data failure”

#初始化数据库
def init_mysql():
# 打开数据库连接
db = MySQLdb.connect(“localhost”, “myuser”, “mypassword”, “mydb”, charset=’utf8′ )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 更新语句
sql = “update appname set count=0”
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()

# 关闭数据库连接
db.close()

def updata_mysql(sname_count,sname_list):
# 打开数据库连接
db = MySQLdb.connect(“localhost”, “myuser”, “mypassword”, “mydb”, charset=’utf8’ )

# 使用cursor()方法获取操作游标
cursor = db.cursor()

# SQL 更新语句
sql = “update appname set count=%d where sname = ‘%s'” % (sname_count,sname_list)
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()

# 关闭数据库连接
db.close()

#根据Index数据结构通过Elasticsearch Python Client上传数据到新的Index
def import_process_data():
try:
#列表形式显示结果
res = get_original_data()
#print res
res_list = res.get(‘aggregations’).get(‘2’).get(‘buckets’)
#print res_list

#初始化数据库
init_mysql()

#获取24小时内出现的SNAME
for value in res_list:
sname_list = value.get(‘key’)
sname_count = value.get(‘doc_count’)
print sname_list,sname_count
#更新sname_status值
updata_mysql(sname_count,sname_list)

except Exception, e:
print repr(e)

if __name__ == “__main__”:
import_process_data()

总结:

主要是DSL语法的编写涉及到查询和聚合,我们可以通过devtool先进行测试,得到正确语法,再结合Python对字典、列表、除法、字符串等操作,下面总结了几个算法:

1)总请求
http_host.keyword: api.mydomain.com

2)超长请求
http_host.keyword: api.mydomain.com AND request_time: [1 TO 600] NOT apistatus.status.keyword:*错误

3)错误请求
apistatus.status.keyword:*错误 AND (http_host.keyword: api.mydomain.com OR http_host.keyword: api.yourdomain.com )

4)请求健康度
域名与request_time聚合,域名请求时间小于3秒的次数除以总请求次数对应各个域名健康度

5)请求正确率
域名与http状态码聚合,域名http状态码为200的次数除以域名总请求数对应各个域名的请求正确率

感谢大家的阅读,我们也会给大家提供更加有帮助的分享,希望大家觉得有用处,祝大家工作顺利!

本文转载于:http://win-man.com
本文关键词:云主机     云服务器     BGP服务器     高防服务器
作者:云服务器安全专家

发表评论

电子邮件地址不会被公开。 必填项已用*标注