creditReportWorker.py0708 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # encoding: utf-8
  2. import threading;
  3. import requests;
  4. import json;
  5. import time;
  6. import datetime;
  7. import pymysql;
  8. import traceback;
  9. import sys;
  10. from io import BytesIO
  11. import os;
  12. import logging;
  13. import timeit;
  14. from urllib import parse;
  15. import random
  16. from ini_op import Config;
  17. import logApi
  18. logger = logApi.logger
  19. from pboc.invokePboc import PBOC
  20. base_dir = os.path.dirname(os.path.abspath(__file__))
  21. config = Config(base_dir+"/config.ini");
  22. tokenApiUrl = config.get("baseconf", "tokenApiUrl");
  23. pwd = config.get("baseconf", "pwd");
  24. loanApplyApiUrl = config.get("baseconf", "loanApplyApiUrl");
  25. interval = int(config.get("baseconf", "interval"));
  26. db = config.get("baseconf", "db");
  27. from DBUtils.PooledDB import PooledDB;
  28. pool = PooledDB(pymysql,20,host='localhost',user='root',passwd=pwd,db=db,port=3306,charset="utf8");
  29. from prp import PrpCrypt
  30. #根据空白期标志来预约
  31. class CreditReportWorker(threading.Thread):
  32. def __init__(self,threadname):
  33. threading.Thread.__init__(self, name=threadname)
  34. def getToken(self):
  35. token = ""
  36. headers = {"Content-Type": "application/json",
  37. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"}
  38. param = {
  39. "grant_type": config.get("baseconf","grant_type"),
  40. "client_id": config.get("baseconf","client_id"),
  41. "client_secret": config.get("baseconf","client_secret"),
  42. "tenant_id": config.get("baseconf","tenant_id"),
  43. "timestamp": int(int(round(time.time() * 1000))),
  44. "nonce": config.get("baseconf","nonce")
  45. }
  46. try:
  47. result = requests.post(tokenApiUrl,json=param,headers=headers,timeout=10)
  48. data = json.loads(result.text)
  49. errorCode = data["errcode"];
  50. if errorCode == '0':
  51. token = data["data"]["access_token"];
  52. except:
  53. logger.error("获取token失败 ")
  54. return token;
  55. #查询合同申请列表
  56. def getLoanApplyList(self,token,pageNo,startId,applyDate,productNums):
  57. data = []
  58. try:
  59. url = loanApplyApiUrl+"?access_token="+token+"&pageNo="+str(pageNo)+"&startId="+str(startId)+"&productNums="+productNums
  60. result = requests.get(url,timeout=10)
  61. # p = PrpCrypt(config.get("baseconf","AESKey"))
  62. key = config.get("baseconf","AESKey")
  63. pboc = PBOC("", "");
  64. # logger.info("token:"+token)
  65. # logger.info(url)
  66. # logger.info(result.text)
  67. resultText = pboc.decrypt(result.text.replace("\"",""),key)
  68. # if resultText.find("]}")>0:
  69. # resultText = resultText[0:resultText.find("]}")+2]
  70. data = json.loads(resultText)
  71. errorCode = data["errcode"];
  72. if errorCode =='0':
  73. data = data["data"]
  74. else:
  75. data = []
  76. except Exception:
  77. logger.error("获取合同申请列表失败 ")
  78. info = sys.exc_info()
  79. logger.error(info[0])
  80. logger.error(info[1])
  81. # logging.log(logging.ERROR, info[2])
  82. logger.error(traceback.extract_tb(info[2], 1))
  83. return data;
  84. #获取最大的业务ID
  85. def getMaxBizId(self):
  86. biz_id = 0;
  87. try:
  88. self.conn = pool.connection();
  89. self.cursor = self.conn.cursor()
  90. sql = "select max(biz_id) from querycustomer";
  91. res = self.cursor.execute(sql);
  92. data = self.cursor.fetchone();
  93. for biz_id in data:
  94. biz_id = biz_id;
  95. except:
  96. logging.log(logging.ERROR,"get start_id list error")
  97. self.cursor.close();
  98. self.conn.close();
  99. if biz_id== None:
  100. biz_id = 0;
  101. return biz_id;
  102. #保存需要查询的客户
  103. def saveQueryCustomer(self,loan):
  104. self.conn = pool.connection();
  105. self.cursor = self.conn.cursor()
  106. batch_num = 0
  107. store = ""
  108. query_ind = "0"
  109. query_time =datetime.datetime.now()
  110. if loan !=None:
  111. cust_name = loan["customerName"]
  112. cerf_id = loan["certificateNum"]
  113. business_num = loan["businessNum"]
  114. customer_num = loan["customerNum"]
  115. coop_business_num = loan["coopBusinessNum"]#接口还没有加
  116. product_num = loan["productNum"]#产品编号
  117. biz_id = loan["id"]
  118. logger.info(loan)
  119. try:
  120. self.cursor.execute('insert into querycustomer(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num,coop_business_num,product_num) '
  121. 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num,coop_business_num,product_num))
  122. self.conn.commit()
  123. except Exception:
  124. # 如果发生错误则回滚
  125. info = sys.exc_info()
  126. logger.error(info[0])
  127. logger.error(info[1])
  128. # logging.log(logging.ERROR, info[2])
  129. logger.error(traceback.extract_tb(info[2], 1))
  130. logging.log(logging.ERROR, "saveQueryCustomer error")
  131. # 关闭数据库连接
  132. self.cursor.close();
  133. self.conn.close();
  134. return;
  135. def run(self):
  136. logger.info("征信报告查询客户服务启动...");
  137. while True:
  138. cur_time = time.time();
  139. cur_hour = time.strftime('%H', time.localtime(time.time()));
  140. cur_min = time.strftime('%M', time.localtime(time.time()));
  141. cur_secd = time.strftime('%S', time.localtime(time.time()));
  142. #获取token
  143. token = self.getToken();
  144. startId = self.getMaxBizId();
  145. productNums = config.get("baseconf", "productNums");
  146. loanApplyList = self.getLoanApplyList(token,1,startId,None,productNums)
  147. for loan in loanApplyList:
  148. self.saveQueryCustomer(loan)
  149. time.sleep(interval)