creditReportWorker.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # encoding: utf-8
  2. import threading;
  3. import requests;
  4. import json;
  5. import time;
  6. import datetime;
  7. import pymysql;
  8. import traceback;
  9. import sys;
  10. from io import BytesIO
  11. import os;
  12. import logging;
  13. import timeit;
  14. from urllib import parse;
  15. import random
  16. from ini_op import Config;
  17. import logApi
  18. import shutil
  19. logger = logApi.logger
  20. from pboc.invokePboc import PBOC
  21. base_dir = os.path.dirname(os.path.abspath(__file__))
  22. config = Config(base_dir+"/config.ini");
  23. tokenApiUrl = config.get("baseconf", "tokenApiUrl");
  24. pwd = config.get("baseconf", "pwd");
  25. loanApplyApiUrl = config.get("baseconf", "loanApplyApiUrl");
  26. interval = int(config.get("baseconf", "interval"));
  27. parsePath = config.get("baseconf", "parsePath");
  28. db = config.get("baseconf", "db");
  29. from DBUtils.PooledDB import PooledDB;
  30. pool = PooledDB(pymysql,20,host='localhost',user='root',passwd=pwd,db=db,port=3306,charset="utf8");
  31. from prp import PrpCrypt
  32. from dbController import DbController
  33. dbController = DbController();
  34. #根据空白期标志来预约
  35. class CreditReportWorker(threading.Thread):
  36. def __init__(self,threadname):
  37. threading.Thread.__init__(self, name=threadname)
  38. def getToken(self):
  39. token = ""
  40. headers = {"Content-Type": "application/json",
  41. "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"}
  42. param = {
  43. "grant_type": config.get("baseconf","grant_type"),
  44. "client_id": config.get("baseconf","client_id"),
  45. "client_secret": config.get("baseconf","client_secret"),
  46. "tenant_id": config.get("baseconf","tenant_id"),
  47. "timestamp": int(int(round(time.time() * 1000))),
  48. "nonce": config.get("baseconf","nonce")
  49. }
  50. try:
  51. result = requests.post(tokenApiUrl,json=param,headers=headers,timeout=10)
  52. data = json.loads(result.text)
  53. errorCode = data["errcode"];
  54. if errorCode == '0':
  55. token = data["data"]["access_token"];
  56. except:
  57. logger.error("获取token失败 ")
  58. return token;
  59. #查询合同申请列表
  60. def getLoanApplyList(self,token,pageNo,startId,applyDate,productNums):
  61. data = []
  62. try:
  63. url = loanApplyApiUrl+"?access_token="+token+"&pageNo="+str(pageNo)+"&startId="+str(startId)+"&productNums="+productNums
  64. result = requests.get(url,timeout=10)
  65. # p = PrpCrypt(config.get("baseconf","AESKey"))
  66. key = config.get("baseconf","AESKey")
  67. pboc = PBOC();
  68. # logger.info("token:"+token)
  69. # logger.info(url)
  70. # logger.info(result.text)
  71. resultText = pboc.decrypt(result.text.replace("\"",""),key)
  72. # if resultText.find("]}")>0:
  73. # resultText = resultText[0:resultText.find("]}")+2]
  74. data = json.loads(resultText)
  75. errorCode = data["errcode"];
  76. if errorCode =='0':
  77. data = data["data"]
  78. else:
  79. data = []
  80. except Exception:
  81. logger.error("获取合同申请列表失败 ")
  82. info = sys.exc_info()
  83. logger.error(info[0])
  84. logger.error(info[1])
  85. # logging.log(logging.ERROR, info[2])
  86. logger.error(traceback.extract_tb(info[2], 1))
  87. return data;
  88. #获取最大的业务ID
  89. def getMaxBizId(self):
  90. biz_id = 0;
  91. try:
  92. self.conn = pool.connection();
  93. self.cursor = self.conn.cursor()
  94. sql = "select max(biz_id) from querycustomer";
  95. res = self.cursor.execute(sql);
  96. data = self.cursor.fetchone();
  97. for biz_id in data:
  98. biz_id = biz_id;
  99. except:
  100. logging.log(logging.ERROR,"get start_id list error")
  101. self.cursor.close();
  102. self.conn.close();
  103. if biz_id== None:
  104. biz_id = 0;
  105. return biz_id;
  106. #保存需要查询的客户
  107. def saveQueryCustomer(self,loan,query_ind):
  108. self.conn = pool.connection();
  109. self.cursor = self.conn.cursor()
  110. batch_num = 0
  111. store = ""
  112. # query_ind = "0"
  113. query_time =datetime.datetime.now()
  114. if loan !=None:
  115. cust_name = loan["customerName"]
  116. cerf_id = loan["certificateNum"]
  117. business_num = loan["businessNum"]
  118. customer_num = loan["customerNum"]
  119. coop_business_num = loan["coopBusinessNum"]#接口还没有加
  120. product_num = loan["productNum"]#产品编号
  121. biz_id = loan["id"]
  122. logger.info(loan)
  123. try:
  124. self.cursor.execute('insert into querycustomer(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num,coop_business_num,product_num) '
  125. 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num,coop_business_num,product_num))
  126. self.conn.commit()
  127. except Exception:
  128. # 如果发生错误则回滚
  129. info = sys.exc_info()
  130. logger.error(info[0])
  131. logger.error(info[1])
  132. # logging.log(logging.ERROR, info[2])
  133. logger.error(traceback.extract_tb(info[2], 1))
  134. logging.log(logging.ERROR, "saveQueryCustomer error")
  135. # 关闭数据库连接
  136. self.cursor.close();
  137. self.conn.close();
  138. return;
  139. def run(self):
  140. logger.info("征信报告查询客户服务启动...");
  141. while True:
  142. cur_time = time.time();
  143. cur_hour = time.strftime('%H', time.localtime(time.time()));
  144. cur_min = time.strftime('%M', time.localtime(time.time()));
  145. cur_secd = time.strftime('%S', time.localtime(time.time()));
  146. #获取token
  147. token = self.getToken();
  148. startId = self.getMaxBizId();
  149. productNums = config.get("baseconf", "productNums");
  150. loanApplyList = self.getLoanApplyList(token,1,startId,None,productNums)
  151. for loan in loanApplyList:
  152. #一个月内已经查询过一次
  153. cust_name = loan["customerName"]
  154. cerf_id = loan["certificateNum"]
  155. if not dbController.checkLast1Month(cerf_id):
  156. self.saveQueryCustomer(loan,"0")
  157. else:
  158. self.saveQueryCustomer(loan, "1")
  159. logger.info(cerf_id+"一个内已经查询过一次")
  160. #从执行目录下还原
  161. descPdfPath = parsePath+"/"+cust_name+"_"+cerf_id+".pdf";
  162. txtPath = parsePath + "/" + cust_name + "_" + cerf_id + ".txt";
  163. pdf_path = parsePath+"/execed_new/"+cust_name+"_"+cerf_id+".pdf";
  164. try:
  165. shutil.rmtree(txtPath,ignore_errors=True)
  166. shutil.move(pdf_path,descPdfPath)
  167. except:
  168. logger.error("移动文件失败"+pdf_path)
  169. time.sleep(interval)