# coding=utf-8 import os import uuid import requests import json import xmltodict import time from datetime import datetime from hashlib import md5 from django.conf import settings from util.exceptions import CustomError from cryptography.exceptions import InvalidSignature, InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15 from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.hashes import SHA1, SHA256 from cryptography.hazmat.primitives.hmac import HMAC from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.x509 import load_pem_x509_certificate from base64 import b64decode, b64encode class SplitAccountTool(object): '''直连服务商 分账 文档https://pay.weixin.qq.com/wiki/doc/apiv3/index.shtml''' GET = 'GET' POST = 'POST' def __init__(self, appid, mchid, cert_serial_no, apiv3_key, proxy=None): self._mchid = mchid # 商户号 self._appid = appid self._private_key = self._innt_private_key() # 商户证书私钥 self._cert_serial_no = cert_serial_no # 商户证书序列号 self._apiv3_key = apiv3_key # 商户APIv3密钥 self._cert_dir = settings.PUBLIC_CERT_ROOT # 平台证书存放目录 self._gate_way = 'https://api.mch.weixin.qq.com' self._proxy = proxy self._certificates = [] self._init_certificates() def _innt_private_key(self): os_path = settings.PRIVATE_CERT_ROOT + "apiclient_key.pem" with open(os_path) as f: private_key = f.read() return load_private_key(private_key) def _init_certificates(self): if self._cert_dir and os.path.exists(self._cert_dir): for file_name in os.listdir(self._cert_dir): if not file_name.lower().endswith('.pem'): continue with open(self._cert_dir + file_name, encoding="utf-8") as f: certificate = load_certificate(f.read()) now = datetime.utcnow() if certificate and now >= certificate.not_valid_before and now <= certificate.not_valid_after: self._certificates.append(certificate) if not self._certificates: self._update_certificates() if not self._certificates: raise CustomError('没有wechatpay平台证书,请仔细检查您的初始化参数!') def _update_certificates(self): path = '/v3/certificates' self._certificates.clear() code, message = self.request(path, skip_verify=True) if code != 200: return data = json.loads(message).get('data') for value in data: serial_no = value.get('serial_no') effective_time = value.get('effective_time') expire_time = value.get('expire_time') encrypt_certificate = value.get('encrypt_certificate') algorithm = nonce = associated_data = ciphertext = None if encrypt_certificate: algorithm = encrypt_certificate.get('algorithm') nonce = encrypt_certificate.get('nonce') associated_data = encrypt_certificate.get('associated_data') ciphertext = encrypt_certificate.get('ciphertext') if not (serial_no and effective_time and expire_time and algorithm and nonce and associated_data and ciphertext): continue cert_str = aes_decrypt( nonce=nonce, ciphertext=ciphertext, associated_data=associated_data, apiv3_key=self._apiv3_key) certificate = load_certificate(cert_str) if not certificate: continue now = datetime.utcnow() if now < certificate.not_valid_before or now > certificate.not_valid_after: continue self._certificates.append(certificate) if not self._cert_dir: continue if not os.path.exists(self._cert_dir): os.makedirs(self._cert_dir) if not os.path.exists(self._cert_dir + serial_no + '.pem'): f = open(self._cert_dir + serial_no + '.pem', 'w') f.write(cert_str) f.close() def _verify_signature(self, headers, body): signature = headers.get('Wechatpay-Signature') timestamp = headers.get('Wechatpay-Timestamp') nonce = headers.get('Wechatpay-Nonce') serial_no = headers.get('Wechatpay-Serial') cert_found = False for cert in self._certificates: if int('0x' + serial_no, 16) == cert.serial_number: cert_found = True certificate = cert break if not cert_found: self._update_certificates() for cert in self._certificates: if int('0x' + serial_no, 16) == cert.serial_number: cert_found = True certificate = cert break if not cert_found: return False if not rsa_verify(timestamp, nonce, body, signature, certificate): return False return True def rsa_sign(self, private_key, sign_str): message = sign_str.encode('UTF-8') signature = private_key.sign(data=message, padding=PKCS1v15(), algorithm=SHA256()) sign = b64encode(signature).decode('UTF-8').replace('\n', '') return sign def build_authorization(self, path, method, mchid, serial_no, private_key, data=None, nonce_str=None): ''' 一、构建签名串 签名串一共有五行,每一行为一个参数。行尾以 \n(换行符,ASCII编码值为0x0A)结束,包括最后一行。如果参数本身以\n结束,也需要附加一个\n。 第一步,获取HTTP请求的方法(GET, POST, PUT)等 第二步,获取请求的绝对URL,并去除域名部分得到参与签名的URL。如果请求中有查询参数,URL末尾应附加有'?'和对应的查询字符串。 第三步,获取发起请求时的系统当前时间戳,即格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数,作为请求时间戳。微信支付会拒绝处理很久之前发起的请求,请商户保持自身系统的时间准确。 第四步,生成一个请求随机串,可参见生成随机数算法。这里,我们使用命令行直接生成一个。 第五步,获取请求中的请求报文主体(request body)。 请求方法为GET时,报文主体为空。 当请求方法为POST或PUT时,请使用真实发送的JSON报文。 图片上传API,请使用meta对应的JSON报文。 对于下载证书的接口来说,请求报文主体是一个空串。 二、计算签名值 绝大多数编程语言提供的签名函数支持对签名数据进行签名。强烈建议商户调用该类函数,使用商户私钥对待签名串进行SHA256 with RSA签名,并对签名结果进行Base64编码得到签名值。 三、设置HTTP头 微信支付商户API V3要求通过HTTP Authorization头来传递签名 Authorization由认证类型和签名信息两部分组成 1.认证类型,目前为WECHATPAY2-SHA256-RSA2048 2.签名信息 发起请求的商户(包括直连商户、服务商或渠道商)的商户号 mchid 商户API证书序列号serial_no,用于声明所使用的证书 请求随机串nonce_str 时间戳timestamp 签名值signature 注:以上五项签名信息,无顺序要求。 ''' timeStamp = str(int(time.time())) nonce_str = nonce_str or ''.join(str(uuid.uuid4()).split('-')).upper() body = data if isinstance(data, str) else json.dumps(data) if data else '' sign_str = '%s\n%s\n%s\n%s\n%s\n' % (method, path, timeStamp, nonce_str, body) signature = self.rsa_sign(private_key=private_key, sign_str=sign_str) authorization = 'WECHATPAY2-SHA256-RSA2048 mchid="%s",nonce_str="%s",signature="%s",timestamp="%s",serial_no="%s"' % ( mchid, nonce_str, signature, timeStamp, serial_no) return authorization def request(self, path, method=GET, data=None, skip_verify=False, sign_data=None, files=None, cipher_data=False, headers={}): if files: headers.update({'Content-Type': 'multipart/form-data'}) else: headers.update({'Content-Type': 'application/json'}) headers.update({'Accept': 'application/json'}) authorization = self.build_authorization(path, method, self._mchid, self._cert_serial_no, self._private_key, data=sign_data if sign_data else data) headers.update({'Authorization': authorization}) if method == SplitAccountTool.GET: response = requests.get(url=self._gate_way + path, headers=headers, proxies=self._proxy) elif method == SplitAccountTool.POST: response = requests.post(url=self._gate_way + path, json=None if files else data, data=data if files else None, headers=headers, files=files, proxies=self._proxy) else: raise CustomError('请选择正确的请求方式!') if response.status_code in range(200, 300) and not skip_verify: if not self._verify_signature(response.headers, response.text): raise CustomError('签名验证失败!') return response.status_code, response.text if 'application/json' in response.headers.get('Content-Type') else response.content def format_private_key(private_key_str): pem_start = '-----BEGIN PRIVATE KEY-----\n' pem_end = '\n-----END PRIVATE KEY-----' if not private_key_str.startswith(pem_start): private_key_str = pem_start + private_key_str if not private_key_str.endswith(pem_end): private_key_str = private_key_str + pem_end return private_key_str def load_private_key(private_key_str): try: return load_pem_private_key(data=format_private_key(private_key_str).encode('UTF-8'), password=None, backend=default_backend()) except: raise CustomError('商户证书私钥加载失败!') def rsa_verify(timestamp, nonce, body, signature, certificate): '''验证签名''' sign_str = '%s\n%s\n%s\n' % (timestamp, nonce, body) public_key = certificate.public_key() message = sign_str.encode('UTF-8') signature = b64decode(signature) try: public_key.verify(signature, message, PKCS1v15(), SHA256()) except InvalidSignature: return False return True def load_certificate(certificate_str): try: return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend()) except: return None def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key): '''回调信息解密''' key_bytes = apiv3_key.encode('UTF-8') nonce_bytes = nonce.encode('UTF-8') associated_data_bytes = associated_data.encode('UTF-8') data = b64decode(ciphertext) aesgcm = AESGCM(key=key_bytes) try: result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8') except InvalidTag: result = None return result