123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # encoding: utf-8
- import threading;
- import requests;
- import json;
- import time;
- import datetime;
- import pymysql;
- import traceback;
- import sys;
- from io import BytesIO
- import os;
- import logging;
- import timeit;
- from urllib import parse;
- import random
- from ini_op import Config;
- import logApi
- import shutil
- logger = logApi.logger
- from pboc.invokePboc import PBOC
- base_dir = os.path.dirname(os.path.abspath(__file__))
- config = Config(base_dir+"/config.ini");
- tokenApiUrl = config.get("baseconf", "tokenApiUrl");
- pwd = config.get("baseconf", "pwd");
- loanApplyApiUrl = config.get("baseconf", "loanApplyApiUrl");
- interval = int(config.get("baseconf", "interval"));
- parsePath = config.get("baseconf", "parsePath");
- db = config.get("baseconf", "db");
- from DBUtils.PooledDB import PooledDB;
- pool = PooledDB(pymysql,20,host='localhost',user='root',passwd=pwd,db=db,port=3306,charset="utf8");
- from prp import PrpCrypt
- from dbController import DbController
- dbController = DbController();
- #根据空白期标志来预约
- class CreditReportWorker(threading.Thread):
- def __init__(self,threadname):
- threading.Thread.__init__(self, name=threadname)
- def getToken(self):
- token = ""
- headers = {"Content-Type": "application/json",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"}
- param = {
- "grant_type": config.get("baseconf","grant_type"),
- "client_id": config.get("baseconf","client_id"),
- "client_secret": config.get("baseconf","client_secret"),
- "tenant_id": config.get("baseconf","tenant_id"),
- "timestamp": int(int(round(time.time() * 1000))),
- "nonce": config.get("baseconf","nonce")
- }
- try:
- result = requests.post(tokenApiUrl,json=param,headers=headers,timeout=10)
- data = json.loads(result.text)
- errorCode = data["errcode"];
- if errorCode == '0':
- token = data["data"]["access_token"];
- except:
- logger.error("获取token失败 ")
- return token;
- #查询合同申请列表
- def getLoanApplyList(self,token,pageNo,startId,applyDate,productNums):
- data = []
- try:
- url = loanApplyApiUrl+"?access_token="+token+"&pageNo="+str(pageNo)+"&startId="+str(startId)+"&productNums="+productNums
- result = requests.get(url,timeout=10)
- # p = PrpCrypt(config.get("baseconf","AESKey"))
- key = config.get("baseconf","AESKey")
- pboc = PBOC();
- # logger.info("token:"+token)
- # logger.info(url)
- # logger.info(result.text)
- resultText = pboc.decrypt(result.text.replace("\"",""),key)
- # if resultText.find("]}")>0:
- # resultText = resultText[0:resultText.find("]}")+2]
- data = json.loads(resultText)
- errorCode = data["errcode"];
- if errorCode =='0':
- data = data["data"]
- else:
- data = []
- except Exception:
- logger.error("获取合同申请列表失败 ")
- info = sys.exc_info()
- logger.error(info[0])
- logger.error(info[1])
- # logging.log(logging.ERROR, info[2])
- logger.error(traceback.extract_tb(info[2], 1))
- return data;
- #获取最大的业务ID
- def getMaxBizId(self):
- biz_id = 0;
- try:
- self.conn = pool.connection();
- self.cursor = self.conn.cursor()
- sql = "select max(biz_id) from querycustomer";
- res = self.cursor.execute(sql);
- data = self.cursor.fetchone();
- for biz_id in data:
- biz_id = biz_id;
- except:
- logging.log(logging.ERROR,"get start_id list error")
- self.cursor.close();
- self.conn.close();
- if biz_id== None:
- biz_id = 0;
- return biz_id;
- #保存需要查询的客户
- def saveQueryCustomer(self,loan,query_ind):
- self.conn = pool.connection();
- self.cursor = self.conn.cursor()
- batch_num = 0
- store = ""
- # query_ind = "0"
- query_time =datetime.datetime.now()
- if loan !=None:
- cust_name = loan["customerName"]
- cerf_id = loan["certificateNum"]
- business_num = loan["businessNum"]
- customer_num = loan["customerNum"]
- coop_business_num = loan["coopBusinessNum"]#接口还没有加
- product_num = loan["productNum"]#产品编号
- biz_id = loan["id"]
- logger.info(loan)
- try:
- self.cursor.execute('insert into querycustomer(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num,coop_business_num,product_num) '
- 'values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)',(cerf_id,cust_name,store,batch_num,query_ind,query_time,biz_id,customer_num,business_num,coop_business_num,product_num))
- self.conn.commit()
- except Exception:
- # 如果发生错误则回滚
- info = sys.exc_info()
- logger.error(info[0])
- logger.error(info[1])
- # logging.log(logging.ERROR, info[2])
- logger.error(traceback.extract_tb(info[2], 1))
- logging.log(logging.ERROR, "saveQueryCustomer error")
- # 关闭数据库连接
- self.cursor.close();
- self.conn.close();
- return;
- def run(self):
- logger.info("征信报告查询客户服务启动...");
- while True:
- cur_time = time.time();
- cur_hour = time.strftime('%H', time.localtime(time.time()));
- cur_min = time.strftime('%M', time.localtime(time.time()));
- cur_secd = time.strftime('%S', time.localtime(time.time()));
- #获取token
- token = self.getToken();
- startId = self.getMaxBizId();
- productNums = config.get("baseconf", "productNums");
- loanApplyList = self.getLoanApplyList(token,1,startId,None,productNums)
- for loan in loanApplyList:
- #一个月内已经查询过一次
- cust_name = loan["customerName"]
- cerf_id = loan["certificateNum"]
- if not dbController.checkLast1Month(cerf_id):
- self.saveQueryCustomer(loan,"0")
- else:
- self.saveQueryCustomer(loan, "1")
- logger.info(cerf_id+"一个内已经查询过一次")
- #从执行目录下还原
- descPdfPath = parsePath+"/"+cust_name+"_"+cerf_id+".pdf";
- txtPath = parsePath + "/" + cust_name + "_" + cerf_id + ".txt";
- pdf_path = parsePath+"/execed_new/"+cust_name+"_"+cerf_id+".pdf";
- try:
- shutil.rmtree(txtPath,ignore_errors=True)
- shutil.move(pdf_path,descPdfPath)
- except:
- logger.error("移动文件失败"+pdf_path)
- time.sleep(interval)
|