123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- # coding=utf-8
- import os
- import uuid
- import requests
- import json
- import xmltodict
- import time
- from datetime import datetime
- from hashlib import md5
- from django.conf import settings
- from util.exceptions import CustomError
- from cryptography.exceptions import InvalidSignature, InvalidTag
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15
- from cryptography.hazmat.primitives.ciphers.aead import AESGCM
- from cryptography.hazmat.primitives.hashes import SHA1, SHA256
- from cryptography.hazmat.primitives.hmac import HMAC
- from cryptography.hazmat.primitives.serialization import load_pem_private_key
- from cryptography.x509 import load_pem_x509_certificate
- from base64 import b64decode, b64encode
- class SplitAccountTool(object):
- '''直连服务商 分账 文档https://pay.weixin.qq.com/wiki/doc/apiv3/index.shtml'''
- GET = 'GET'
- POST = 'POST'
- def __init__(self, appid, mchid, private_key, cert_serial_no, apiv3_key, cert_dir=None, proxy=None):
- self._mchid = mchid # 商户号
- self._appid = appid
- self._private_key = load_private_key(private_key) # 商户证书私钥
- self._cert_serial_no = cert_serial_no # 商户证书序列号
- self._apiv3_key = apiv3_key # 商户APIv3密钥
- self._cert_dir = cert_dir # 平台证书存放目录
- self._gate_way = 'https://api.mch.weixin.qq.com'
- self._proxy = proxy
- self._certificates = []
- self._init_certificates()
- def _init_certificates(self):
- if self._cert_dir and os.path.exists(self._cert_dir):
- for file_name in os.listdir(self._cert_dir):
- if not file_name.lower().endswith('.pem'):
- continue
- with open(self._cert_dir + file_name, encoding="utf-8") as f:
- certificate = load_certificate(f.read())
- now = datetime.utcnow()
- if certificate and now >= certificate.not_valid_before and now <= certificate.not_valid_after:
- self._certificates.append(certificate)
- if not self._certificates:
- self._update_certificates()
- if not self._certificates:
- raise CustomError('没有wechatpay平台证书,请仔细检查您的初始化参数!')
- def _update_certificates(self):
- path = '/v3/certificates'
- self._certificates.clear()
- code, message = self.request(path, skip_verify=True)
- if code != 200:
- return
- data = json.loads(message).get('data')
- for value in data:
- serial_no = value.get('serial_no')
- effective_time = value.get('effective_time')
- expire_time = value.get('expire_time')
- encrypt_certificate = value.get('encrypt_certificate')
- algorithm = nonce = associated_data = ciphertext = None
- if encrypt_certificate:
- algorithm = encrypt_certificate.get('algorithm')
- nonce = encrypt_certificate.get('nonce')
- associated_data = encrypt_certificate.get('associated_data')
- ciphertext = encrypt_certificate.get('ciphertext')
- if not (serial_no and effective_time and expire_time and algorithm and nonce and associated_data and ciphertext):
- continue
- cert_str = aes_decrypt(
- nonce=nonce,
- ciphertext=ciphertext,
- associated_datformat_private_keya=associated_data,
- apiv3_key=self._apiv3_key)
- certificate = load_certificate(cert_str)
- if not certificate:
- continue
- now = datetime.utcnow()
- if now < certificate.not_valid_before or now > certificate.not_valid_after:
- continue
- self._certificates.append(certificate)
- if not self._cert_dir:
- continue
- if not os.path.exists(self._cert_dir):
- os.makedirs(self._cert_dir)
- if not os.path.exists(self._cert_dir + serial_no + '.pem'):
- f = open(self._cert_dir + serial_no + '.pem', 'w')
- f.write(cert_str)
- f.close()
- def _verify_signature(self, headers, body):
- signature = headers.get('Wechatpay-Signature')
- timestamp = headers.get('Wechatpay-Timestamp')
- nonce = headers.get('Wechatpay-Nonce')
- serial_no = headers.get('Wechatpay-Serial')
- cert_found = False
- for cert in self._certificates:
- if int('0x' + serial_no, 16) == cert.serial_number:
- cert_found = True
- certificate = cert
- break
- if not cert_found:
- self._update_certificates()
- for cert in self._certificates:
- if int('0x' + serial_no, 16) == cert.serial_number:
- cert_found = True
- certificate = cert
- break
- if not cert_found:
- return False
- if not rsa_verify(timestamp, nonce, body, signature, certificate):
- return False
- return True
- def rsa_sign(self, private_key, sign_str):
- message = sign_str.encode('UTF-8')
- signature = private_key.sign(data=message, padding=PKCS1v15(), algorithm=SHA256())
- sign = b64encode(signature).decode('UTF-8').replace('\n', '')
- return sign
- def build_authorization(self, path, method, mchid, serial_no, private_key, data=None, nonce_str=None):
- '''
- 一、构建签名串
- 签名串一共有五行,每一行为一个参数。行尾以 \n(换行符,ASCII编码值为0x0A)结束,包括最后一行。如果参数本身以\n结束,也需要附加一个\n。
- 第一步,获取HTTP请求的方法(GET, POST, PUT)等
- 第二步,获取请求的绝对URL,并去除域名部分得到参与签名的URL。如果请求中有查询参数,URL末尾应附加有'?'和对应的查询字符串。
- 第三步,获取发起请求时的系统当前时间戳,即格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数,作为请求时间戳。微信支付会拒绝处理很久之前发起的请求,请商户保持自身系统的时间准确。
- 第四步,生成一个请求随机串,可参见生成随机数算法。这里,我们使用命令行直接生成一个。
- 第五步,获取请求中的请求报文主体(request body)。
- 请求方法为GET时,报文主体为空。
- 当请求方法为POST或PUT时,请使用真实发送的JSON报文。
- 图片上传API,请使用meta对应的JSON报文。
- 对于下载证书的接口来说,请求报文主体是一个空串。
- 二、计算签名值
- 绝大多数编程语言提供的签名函数支持对签名数据进行签名。强烈建议商户调用该类函数,使用商户私钥对待签名串进行SHA256 with RSA签名,并对签名结果进行Base64编码得到签名值。
- 三、设置HTTP头
- 微信支付商户API V3要求通过HTTP Authorization头来传递签名 Authorization由认证类型和签名信息两部分组成
- 1.认证类型,目前为WECHATPAY2-SHA256-RSA2048
- 2.签名信息
- 发起请求的商户(包括直连商户、服务商或渠道商)的商户号 mchid
- 商户API证书序列号serial_no,用于声明所使用的证书
- 请求随机串nonce_str
- 时间戳timestamp
- 签名值signature
- 注:以上五项签名信息,无顺序要求。
- '''
- timeStamp = str(int(time.time()))
- nonce_str = nonce_str or ''.join(str(uuid.uuid4()).split('-')).upper()
- body = data if isinstance(data, str) else json.dumps(data) if data else ''
- sign_str = '%s\n%s\n%s\n%s\n%s\n' % (method, path, timeStamp, nonce_str, body)
- signature = self.rsa_sign(private_key=private_key, sign_str=sign_str)
- authorization = 'WECHATPAY2-SHA256-RSA2048 mchid="%s",nonce_str="%s",signature="%s",timestamp="%s",serial_no="%s"' % (
- mchid, nonce_str, signature, timeStamp, serial_no)
- return authorization
- def request(self, path, method=GET, data=None, skip_verify=False, sign_data=None, files=None, cipher_data=False, headers={}):
- if files:
- headers.update({'Content-Type': 'multipart/form-data'})
- else:
- headers.update({'Content-Type': 'application/json'})
- headers.update({'Accept': 'application/json'})
- authorization = self.build_authorization(path, method, self._mchid, self._cert_serial_no, self._private_key, data=sign_data if sign_data else data)
- headers.update({'Authorization': authorization})
- if method == SplitAccountTool.GET:
- response = requests.get(url=self._gate_way + path, headers=headers, proxies=self._proxy)
- elif method == SplitAccountTool.POST:
- response = requests.post(url=self._gate_way + path, json=None if files else data, data=data if files else None, headers=headers, files=files, proxies=self._proxy)
- else:
- raise CustomError('请选择正确的请求方式!')
- if response.status_code in range(200, 300) and not skip_verify:
- if not self._verify_signature(response.headers, response.text):
- raise CustomError('签名验证失败!')
- return response.status_code, response.text if 'application/json' in response.headers.get('Content-Type') else response.content
- def format_private_key(private_key_str):
- pem_start = '-----BEGIN PRIVATE KEY-----\n'
- pem_end = '\n-----END PRIVATE KEY-----'
- if not private_key_str.startswith(pem_start):
- private_key_str = pem_start + private_key_str
- if not private_key_str.endswith(pem_end):
- private_key_str = private_key_str + pem_end
- return private_key_str
- def load_private_key(private_key_str):
- try:
- return load_pem_private_key(data=format_private_key(private_key_str).encode('UTF-8'), password=None, backend=default_backend())
- except:
- raise CustomError('商户证书私钥加载失败!')
- def rsa_verify(timestamp, nonce, body, signature, certificate):
- '''验证签名'''
- sign_str = '%s\n%s\n%s\n' % (timestamp, nonce, body)
- public_key = certificate.public_key()
- message = sign_str.encode('UTF-8')
- signature = b64decode(signature)
- try:
- public_key.verify(signature, message, PKCS1v15(), SHA256())
- except InvalidSignature:
- return False
- return True
- def load_certificate(certificate_str):
- try:
- return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend())
- except:
- return None
- def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key):
- '''回调信息解密'''
- key_bytes = apiv3_key.encode('UTF-8')
- nonce_bytes = nonce.encode('UTF-8')
- associated_data_bytes = associated_data.encode('UTF-8')
- data = b64decode(ciphertext)
- aesgcm = AESGCM(key=key_bytes)
- try:
- result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8')
- except InvalidTag:
- result = None
- return result
|