import pymysql; from DBUtils.PooledDB import PooledDB; import logApi logger = logApi.logger import os from ini_op import Config; import json import time import requests import sys import traceback base_dir = os.path.dirname(os.path.abspath(__file__)) config = Config(base_dir+"/config.ini"); tokenApiUrl = config.get("baseconf", "tokenApiUrl"); pwd = config.get("baseconf", "pwd"); db = config.get("baseconf", "db"); pool = PooledDB(pymysql,10,host='localhost',user='root',passwd='wdshy1gxf',db=db,port=3306,charset="utf8"); # pool = PooledDB(pymysql,10,host='10.100.81.179',user='root',passwd=pwd,db=db,port=3306,charset="utf8"); import random import string import threading class DbController(): # 好像是认证的token? def getToken(self): token = "" headers = {"Content-Type": "application/json", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"} param = { "grant_type": config.get("baseconf", "grant_type"), "client_id": config.get("baseconf", "client_id"), "client_secret": config.get("baseconf", "client_secret"), "tenant_id": config.get("baseconf", "tenant_id"), "timestamp": int(int(round(time.time() * 1000))), "nonce": ''.join(random.sample(string.ascii_letters + string.digits, 16)) } print("发送报文") print(headers) print(param) try: result = requests.post(tokenApiUrl, json=param, headers=headers, timeout=10) print("result是") print(result) data = json.loads(result.text) errorCode = data["errcode"]; if errorCode == '0': token = data["data"]["access_token"] print("token是:"+str(token)) except Exception: logger.error("获取token失败 ") logger.error(traceback.print_exc()) logger.error(traceback.format_exc()) return token; def getBussinessNum(self,cerf_id): business_num = ""; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select business_num from querycustomer where cerf_id='"+cerf_id+"' order by biz_id desc"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if len(data)>0: business_num = data[0] except: logger.error(traceback.print_exc()) logger.error(traceback.format_exc()) self.cursor.close(); self.conn.close(); return business_num; def updateParseInd(self,cerf_id,parseInd): try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "update querycustomer set parse_ind='"+parseInd+"' where cerf_id='"+cerf_id+"'"; res = self.cursor.execute(sql); self.conn.commit() except: logger.error(" updateParseInd error") finally: self.cursor.close(); self.conn.close(); def getParseInd(self,cerf_id): parse_ind = ""; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select parse_ind from querycustomer where cerf_id='"+cerf_id+"' order by biz_id desc"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if len(data)>0: parse_ind = data[0] except: logger.error(" getBussinessNum error"+sql) info = sys.exc_info() logger.error(info[0]) logger.error(info[1]) # logging.log(logging.ERROR, info[2]) logger.error(traceback.extract_tb(info[2], 1)) finally: self.cursor.close(); self.conn.close(); return parse_ind; def getProductNum(self,cerf_id): product_num = ""; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select product_num from querycustomer where cerf_id='"+cerf_id+"' order by biz_id desc"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if data !=None and len(data)>0: product_num = data[0] except: logger.error(traceback.print_exc()) logger.error(traceback.format_exc()) finally: self.cursor.close(); self.conn.close(); return product_num; def getCoopBussinessNum(self,cerf_id): coop_business_num = ""; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select coop_business_num from querycustomer where cerf_id='"+cerf_id+"' order by biz_id desc"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if len(data)>0: coop_business_num = data[0] except: logger.error(" getBussinessNum error") self.cursor.close(); self.conn.close(); return coop_business_num; def checkLast1Month(self,cerf_id): isExists = False; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select coop_business_num from querycustomer where cerf_id='"+cerf_id+"' and DATE_SUB(CURDATE(), INTERVAL 1 MONTH) <= date(query_time)"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if len(data)>0: isExists = True except: logger.error(" getBussinessNum error") self.cursor.close(); self.conn.close(); return isExists; def getCustomerNum(self,cerf_id): customer_num = ""; try: self.conn = pool.connection(); self.cursor = self.conn.cursor() sql = "select customer_num from querycustomer where cerf_id='"+cerf_id+"' order by biz_id desc"; res = self.cursor.execute(sql); data = self.cursor.fetchone(); if len(data)>0: customer_num = data[0] except: logger.error(" getBussinessNum error") self.cursor.close(); self.conn.close(); return customer_num; def execSql(self,sql): try: # self.conn = pool.connection(); # self.cursor = self.conn.cursor() res = self.cursor.execute(sql); # self.conn.commit() # headers = {"Content-Type": "application/json"} # request_data = {"execSql":sql} # data = json.dumps(request_data) # requests.post(execUrl,data,headers) # logger.info(sql) except: logger.error(" saveNode error") logger.error(traceback.print_exc()) logger.error(traceback.format_exc()) logger.info(sql) finally: # self.cursor.close(); # self.conn.close(); pass # def saveNode(self,sql): # logger.info(sql) # t = threading.Thread(target=self.execSql, args=[sql]); # t.start(); def getConn(self): self.conn = pool.connection(); self.cursor = self.conn.cursor() def commit(self): self.conn.commit() self.cursor.close(); self.conn.close();