灵活运用Pyhon语言来提高我们的工作效率
一、用python来请求接口
虽然现在的接口请求工具比较多,比如postman,或者jmeter ,或者apifox等等,但是通过python来做接口请求也是蛮过瘾的
# coding:utf-8
import requests
import json
if __name__ == '__main__':
Post_url = "http://www-uat.perfect99.com/perfect-mall-application/v1/user/register/getVerifyCode"
headers = {'Content-Type': 'application/json;charset=UTF-8',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36'}
Post_data = {"phone": "17630009998", "type": 1, "descritionType": 2}
response = requests.post(Post_url, data=json.dumps(Post_data), headers=headers)
print(response.text)
json_str = eval(json.dumps(response.text))
data = json.loads(json_str)
if data["resultCode"] == "0" and data['resultMsg'] == "success":
print("success")
else:
print("fail")
二、两数据库两表字段对比
import pymysql
import logging
# 设定日志级别
logging.basicConfig(
level=logging.DEBUG
)
# 旧数据库
conn_old = pymysql.connect(host="172.100.29.247",
user="perfect_w",
password="XXXXX",
db="perfect_stage_center_inventory",
port=3306,
charset="utf8")
# 新数据库
conn_new = pymysql.connect(host="172.100.29.247",
user="perfect_w",
password="XXXXX",
db="perfect_stage_center_inventory",
port=3306,
charset="utf8")
# 新旧表字段存放在二维列表中
def db_diff(tb_new, tb_old, *tb_field):
"""
:param tb_new: 新表
:param tb_old: 旧表
:param tb_field: [[新表中字段,],[对应旧表中的字段,]]
:return: 返回新旧表中的数据总量,以及旧表中存在,但是在新表中没有找到的数据
"""
# 校验数据总量是否一致
cmp_new_sql = "select count(*) from " + tb_new + ";"
cmp_old_sql = "select count(*) from " + tb_old + ";"
logging.debug(cmp_new_sql)
logging.debug(cmp_old_sql)
cursor_new = conn_new.cursor()
cursor_old = conn_old.cursor()
cursor_new.execute(cmp_new_sql)
cursor_old.execute(cmp_old_sql)
new_num = cursor_new.fetchone()
old_num = cursor_old.fetchone()
print(new_num)
print(old_num)
if new_num == old_num:
logging.info(tb_new + "和" + tb_old + "数据量相同:" + str(new_num))
else:
logging.error({tb_new + "_new": new_num[0], tb_old: old_num[0]})
# 校验各字段值是否一致
field_new = ", ".join(tb_field[0][0])
field_old = ", ".join(tb_field[0][1])
cmp_dt_new_sql = "select " + field_new + " from " + tb_new + ";"
cmp_dt_old_sql = "select " + field_old + " from " + tb_old + ";"
print("cmp_dt_new_sql:",cmp_dt_new_sql)
print("cmp_dt_old_sql:",cmp_dt_old_sql)
logging.debug(cmp_dt_new_sql)
logging.debug(cmp_dt_old_sql)
cursor_new.execute(cmp_dt_new_sql)
cursor_old.execute(cmp_dt_old_sql)
new_dt = cursor_new.fetchall()
old_dt = cursor_old.fetchall()
# logging.debug(list(new_dt))
# logging.debug(list(old_dt))
count = 0
for item in old_dt:
print(item)
if item in new_dt:
pass
else:
logging.error(tb_new + "新表中未找到:" + str(item))
count += 1
logging.error("总数: %d" % count)
logging.error("\n\n")
# 关闭游标
cursor_old.close()
cursor_new.close()
return
if __name__ == '__main__':
# 测试表
test_table_field = [["shop_code","item_code","last_balance"],["shop_code","item_code","last_balance"]]
db_diff("in_storage_report_new", "in_storage_last_balance", test_table_field)
# 关闭数据库连接
conn_old.close()
conn_new.close()
三、两Excel文件数据对比
# -*- coding: utf-8 -*-
# 比对两个Excel文件内容的差异
# ---------------------假设条件----------------
# 1、源表和目标表格式一致
# 2、不存在合并单元格
# 3、第2行开始比对
# ---------------------------------------------
import xlrd
import time # 引入time模块
# 往日志文件中追加内容函数
def writeappend_logfile(filename, content):
file = open(filename, 'a') # 以追加方式打开日志文件
time_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 系统时间格式化
file.writelines(time_now + ':' + str(content) + '\n') # 写入内容
file.close() # 关闭文件
def read_excel(ori_path, tar_path, sub_name): #
print("ori_path:", ori_path)
print("tar_path:", tar_path)
success = 0 # 匹配一致数量
fail = 0 # 匹配不一致数量
origin_xls = {} # 存储源xls文件
target_xls = {} # 比对的xls文件
wb_ori = xlrd.open_workbook(ori_path) # 打开原始文件
wb_tar = xlrd.open_workbook(tar_path) # 打开目标文件
sheet_num = len(wb_ori.sheets()) # 源表子表数量
startime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 获取系统当前时间并格式化为格式
print(startime, ' 开始比对......')
logname = 'log_' + startime[0:10] + '.log' # 截取日期年月日构成日志文件名
logfile = open(logname, 'w') # 创建日志文件,如果文件存在则清空内容,不存在则创建,如果需要同时批量比对多张表,可以考虑将日志文件名作为参数传入
logfile.writelines(startime + ':【开始比对】......' + '\n') # 日志文件写入开始比对时间
logfile.close() # 关闭日志文件
try:
sheet_ori = wb_ori.sheet_by_name(sub_name)
print("sheet_ori.name:", sheet_ori.name)
sheet_tar = wb_tar.sheet_by_name(sub_name)
print("sheet_tar.name:", sheet_tar.name)
if sheet_ori.name == sheet_tar.name:
# sheet表名
if sheet_ori.name == sub_name:
# 先将数存入dictionary中dictionary(rows:list)
# 第一行存储表头
# 源表取一行数据与目标表全表进行比对如果表中存在主键可以用主键进行索引
# 数据从excel第3行开始
for rows in range(0, sheet_ori.nrows):
orign_list = sheet_ori.row_values(rows) # 源表i行数据
origin_xls[rows] = orign_list # 源表写入字典
# print("origin_xls[rows]:", origin_xls[rows])
for rows in range(0, sheet_tar.nrows):
target_list = sheet_tar.row_values(rows) # 目标表i行数据
target_xls[rows] = target_list # 目标表写入字典
# print("target_xls[rows]", target_xls[rows])
if origin_xls[0] == target_xls[0]:
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 表头一致')
num = len(origin_xls)
# print("num:", str(num))
num1 = len(target_xls)
# print("num1:", str(num1))
if num >= num1:
for ori_num in origin_xls:
# print("ori_num:", str(ori_num))
flag = 'false' # 判断是否一致标志
for tar_num in target_xls:
if origin_xls[ori_num] == target_xls[tar_num]:
flag = 'true'
break # 如果匹配到结果退出循环
if flag == 'true': # 匹配上结果输出后台日志
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 文件:', ori_path + ' ' + 'row:%d is ok' % (ori_num + 1))
success += 1
else: # 匹配不上将源表中行记录写入txt
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 文件:',ori_path + ' ' + 'row:%d is different' % (ori_num + 1))
fail += 1
data = origin_xls[ori_num]
logstr = '文件:', str(ori_path) + ' ' + '【不一致】row<' + str(ori_num) + '>:' + str(data)
writeappend_logfile(logname, logstr)
logstr = '【比对完成】总记录数:{:d}条,一致:{:d}条,不一致:{:d}条'.format(ori_num + 1, success, fail)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 【%s】比对结束' % sheet_ori.name)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 总记录数:%d条,一致:%d条,不一致:%d条' % (ori_num + 1, success, fail))
writeappend_logfile(logname, logstr)
else:
for tar_num in target_xls:
flag = 'false' # 判断是否一致标志
for ori_num in origin_xls:
if target_xls[tar_num] == origin_xls[ori_num]:
flag = 'true'
break # 如果匹配到结果退出循环
if flag == 'true': # 匹配上结果输出后台日志
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 文件: ',tar_path + ' ' + ' row:%d is ok' % (tar_num + 1))
success += 1
else: # 匹配不上将源表中行记录写入txt
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 文件: ',tar_path + ' ' + ' row:%d is different' % (tar_num + 1))
fail += 1
data = target_xls[tar_num]
logstr = '【不一致】row<' + str(tar_num + 1) + '>:' + str(data)
writeappend_logfile(logname, logstr)
logstr = '【比对完成】总记录数:{:d}条,一致:{:d}条,不一致:{:d}条'.format(tar_num + 1, success, fail)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 【%s】比对结束' % sheet_tar.name)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + ' 总记录数:%d条,一致:%d条,不一致:%d条' % (tar_num + 1, success, fail))
writeappend_logfile(logname, logstr)
else:
errmsg = '【' + sub_name + '】子表名不一致'
writeappend_logfile(logname, errmsg)
except Exception as err:
writeappend_logfile(logname, str(err)) # 输出异常
if __name__ == '__main__':
read_excel(r'业务库存对账全量1_V1.1.xlsx', '业务库存对账全量2_V1.1.xlsx', '汇总')
正文到此结束