creditReportWorker.py0604 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # encoding: utf-8
  2. import threading;
  3. import requests;
  4. import json;
  5. import time;
  6. import datetime;
  7. import pymysql;
  8. import traceback;
  9. import sys;
  10. from io import BytesIO
  11. import os;
  12. import logging;
  13. import timeit;
  14. from urllib import parse;
  15. import random
  16. from ini_op import Config;
  17. import logApi
  18. logger = logApi.logger
  19. base_dir = os.path.dirname(os.path.abspath(__file__))
  20. config = Config(base_dir+"/config.ini");
  21. tokenApiUrl = config.get("baseconf", "tokenApiUrl");
  22. loanApplyApiUrl = config.get("baseconf", "loanApplyApiUrl");
  23. interval = int(config.get("baseconf", "interval"));
  24. from DBUtils.PooledDB import PooledDB;
  25. pool = PooledDB(pymysql,20,host='localhost',user='root',passwd='root',db='crt',port=3306,charset="utf8");
  26. from prp import PrpCrypt
  27. #根据空白期标志来预约
  28. class CreditReportWorker(threading.Thread):
  29. def __init__(self,threadname):
  30. threading.Thread.__init__(self, name=threadname)
  31. def getToken(self):
  32. token = ""
  33. headers = {"Content-Type": "application/json",
  34. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"}
  35. param = {
  36. "grant_type": config.get("baseconf","grant_type"),
  37. "client_id": config.get("baseconf","client_id"),
  38. "client_secret": config.get("baseconf","client_secret"),
  39. "tenant_id": config.get("baseconf","tenant_id"),
  40. "timestamp": int(int(round(time.time() * 1000))),
  41. "nonce": config.get("baseconf","nonce")
  42. }
  43. try:
  44. result = requests.post(tokenApiUrl,json=param,headers=headers,timeout=10)
  45. data = json.loads(result.text)
  46. errorCode = data["errcode"];
  47. if errorCode == '0':
  48. token = data["data"]["access_token"];
  49. except:
  50. logger.error("获取token失败 ")
  51. return token;
  52. #查询合同申请列表
  53. def getLoanApplyList(self,token,pageNo,startId,applyDate,productNums):
  54. data = []
  55. try:
  56. url = loanApplyApiUrl+"?access_token="+token+"&pageNo="+str(pageNo)+"&startId="+str(startId)+"&productNums="+productNums
  57. result = requests.get(url,timeout=10)
  58. p = PrpCrypt(config.get("baseconf","AESKey"))
  59. # logger.info("token:"+token)
  60. # logger.info(url)
  61. # logger.info(result.text)
  62. resultText = p.decrypt(result.text)
  63. if resultText.find("]}")>0:
  64. resultText = resultText[0:resultText.find("]}")+2]
  65. data = json.loads(resultText)
  66. errorCode = data["errcode"];
  67. if errorCode =='0':
  68. data = data["data"]
  69. else:
  70. data = []
  71. except Exception:
  72. logger.error("获取合同申请列表失败 ")
  73. info = sys.exc_info()
  74. logger.error(info[0])
  75. logger.error(info[1])
  76. # logging.log(logging.ERROR, info[2])
  77. logger.error(traceback.extract_tb(info[2], 1))
  78. return data;
  79. #获取最大的业务ID
  80. def getMaxBizId(self):
  81. biz_id = 0;
  82. try:
  83. self.conn = pool.connection();
  84. self.cursor = self.conn.cursor()
  85. sql = "select max(biz_id) from querycustomer";
  86. res = self.cursor.execute(sql);
  87. data = self.cursor.fetchone();
  88. for biz_id in data:
  89. biz_id = biz_id;
  90. except:
  91. logging.log(logging.ERROR,"get start_id list error")
  92. self.cursor.close();
  93. self.conn.close();
  94. if biz_id== None:
  95. biz_id = 0;
  96. return biz_id;
  97. #保存需要查询的客户
  98. def saveQueryCustomer(self,loan):
  99. self.conn = pool.connection();
  100. self.cursor = self.conn.cursor()
  101. batch_num = 0
  102. store = ""
  103. query_ind = "0"
  104. query_time =datetime.datetime.now()
  105. if loan !=None:
  106. cust_name = loan["customerName"]
  107. cerf_id = loan["certificateNum"]
  108. business_num = loan["businessNum"]
  109. customer_num = loan["customerNum"]
  110. biz_id = loan["id"]
  111. logger.info("业务编号:"+business_num)
  112. try:
  113. self.cursor.execute('insert into querycustomer(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num) '
  114. 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s)',(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num))
  115. self.conn.commit()
  116. except Exception:
  117. # 如果发生错误则回滚
  118. info = sys.exc_info()
  119. logger.error(info[0])
  120. logger.error(info[1])
  121. # logging.log(logging.ERROR, info[2])
  122. logger.error(traceback.extract_tb(info[2], 1))
  123. logging.log(logging.ERROR, "saveQueryCustomer error")
  124. # 关闭数据库连接
  125. self.cursor.close();
  126. self.conn.close();
  127. return;
  128. def run(self):
  129. logger.info("征信报告查询客户服务启动...");
  130. while True:
  131. cur_time = time.time();
  132. cur_hour = time.strftime('%H', time.localtime(time.time()));
  133. cur_min = time.strftime('%M', time.localtime(time.time()));
  134. cur_secd = time.strftime('%S', time.localtime(time.time()));
  135. #获取token
  136. token = self.getToken();
  137. startId = self.getMaxBizId();
  138. productNums = config.get("baseconf", "productNums");
  139. loanApplyList = self.getLoanApplyList(token,1,startId,None,productNums)
  140. for loan in loanApplyList:
  141. self.saveQueryCustomer(loan)
  142. time.sleep(interval)