#!/usr/bin/python # -*- coding:utf-8 -*- ''' 功能: 1. 取消内存占用超过一定值 执行时间超过一定值 查杀某些用户的任务 排除某些查询ID 用例: #取消查询QUERY,用户是zhagnwei 超过时间:10s 内存超过1E7 python cancle_impala_task.py 1E7 10000 "select" "zhangsan" "" #取消查询DDL,用户是zhagnwei 超过时间:10s python cancle_impala_task.py "" 10000 "invalidat" "zhangsan" "" #取消查询DDL,用户是zhagnwei 超过时间:10s 排除查询ID python cancle_impala_task.py "" 10000 "invalidat" "zhangsan" "000000000e000000:2000000000000000" 说明: 1E8 = 100M 1E9 = 1G 1E10 = 10G 1E11 = 100G ''' import sys import commands cmip = "192.168.1.120" cmuser = "zhangsan" cmpasswd = "11111111" # 获取参数 if len(sys.argv) == 1 or len(sys.argv) >= 7: print "parameters is illegal." sys.exit(1) #使用内存设置 if len(sys.argv) >= 2: maxMemory = sys.argv[1] print "maxMemory:", maxMemory #超时时间设置 if len(sys.argv) >= 3: timeout = sys.argv[2] print "timeout:", timeout #查询语句 if len(sys.argv) >= 4: querytype = sys.argv[3] querytypelist = querytype.split(",") print "querytypelist:", querytypelist #用户设置 if len(sys.argv) >= 5: userlist = sys.argv[4] whitelistuser = userlist.split(",") print "exe user white list:", whitelistuser #会话ID设置 if len(sys.argv) == 6: idlist = sys.argv[5] whitelistid = idlist.split(",") print "query id white list:", whitelistid # 解析参数--内存限制 if maxMemory != "": maxMemory = "%20and%20memory_aggregate_peak>" + maxMemory # 解析参数--用户限制 utillist = [] if whitelistuser != "": for user in whitelistuser: utillist.append("user="+user) userstr = "%20and%20".join(utillist) if userstr != "": userstr = "%20and%20(" + userstr + ")" else: userstr="" # 解析参数--时间限制 if timeout != "": timeout = "%20and%20(query_duration>=" + timeout + ")" # 解析参数--查询类型限制 utillist1 = [] for type in querytypelist: if type.upper() == "SELECT": utillist1.append("query_type=QUERY") elif type.upper() == "INSERT": utillist1.append("query_type=QUERY") elif type.upper() == "INVALIDATE": utillist1.append("query_type=DDL") else: print "non-supported query type:", type querytypestr = "%20or%20".join(utillist1) print querytypestr if querytypestr != "": querytypestr = "%20and%20(" + querytypestr + ")" # 查询任务 cmd = "curl -u {cmuser}:{cmpasswd} 'http://{cmip}:7180/api/v9/clusters/Cluster1234/services/impala/impalaQueries?filter=(executing=true{maxMemory}{userstr}{timeout}{querytypestr})'".format(cmuser=cmuser, cmpasswd=cmpasswd, cmip=cmip, maxMemory=maxMemory, userstr=userstr, timeout=timeout, querytypestr=querytypestr) print "query job cmd:", cmd status, output = commands.getstatusoutput(cmd) if status == 0: print "query job successful." else: print "query job failed." sys.exit(1) print output # 解析json count = output.count(""queryId" : "") print "job count will be killed:", count if count == 0: print "no result. 没有任务需要删除" sys.exit(0) #是否在白名单中 strList = output_result.split(""queryId" : "")[1:] idList = [] for str in strList: id = str.split(""")[0] idList.append(id) print "query id list will be killed:", idList #执行取消任务 i = 1 for queryId in idList: if queryId in whitelistid: print "white list query id:", queryId print "不进行中断查询,跳过..." continue print " No.{num} queryId: {queryId}".format(queryId=queryId, num=i) cmd = "curl -X POST -u {cmuser}:{cmpasswd} 'http://{cmip}:7180/api/v9/clusters/Cluster1234/services/impala/impalaQueries/{queryId}/cancel'".format(cmuser=cmuser, cmpasswd=cmpasswd, cmip=cmip, queryId=queryId) print "cancel job cmd:", cmd status, output = commands.getstatusoutput(cmd) print output if status == 0: print "cancel job successful." else: print output print "cancel job failed." i+=1 print " Program Execute Successful."

