# -*- coding: utf-8 -*- import json import time import uuid from base64 import b64decode, b64encode from cryptography.exceptions import InvalidSignature, InvalidTag from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15 from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.hashes import SHA1, SHA256, SM3, Hash from cryptography.hazmat.primitives.hmac import HMAC from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.x509 import load_pem_x509_certificate def build_authorization(path, method, mchid, serial_no, private_key, data=None, nonce_str=None): ''' 一、构建签名串 签名串一共有五行,每一行为一个参数。行尾以 \n(换行符,ASCII编码值为0x0A)结束,包括最后一行。如果参数本身以\n结束,也需要附加一个\n。 第一步,获取HTTP请求的方法(GET, POST, PUT)等 第二步,获取请求的绝对URL,并去除域名部分得到参与签名的URL。如果请求中有查询参数,URL末尾应附加有'?'和对应的查询字符串。 第三步,获取发起请求时的系统当前时间戳,即格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数,作为请求时间戳。微信支付会拒绝处理很久之前发起的请求,请商户保持自身系统的时间准确。 第四步,生成一个请求随机串,可参见生成随机数算法。这里,我们使用命令行直接生成一个。 第五步,获取请求中的请求报文主体(request body)。 请求方法为GET时,报文主体为空。 当请求方法为POST或PUT时,请使用真实发送的JSON报文。 图片上传API,请使用meta对应的JSON报文。 对于下载证书的接口来说,请求报文主体是一个空串。 二、计算签名值 绝大多数编程语言提供的签名函数支持对签名数据进行签名。强烈建议商户调用该类函数,使用商户私钥对待签名串进行SHA256 with RSA签名,并对签名结果进行Base64编码得到签名值。 三、设置HTTP头 微信支付商户API V3要求通过HTTP Authorization头来传递签名 Authorization由认证类型和签名信息两部分组成 1.认证类型,目前为WECHATPAY2-SHA256-RSA2048 2.签名信息 发起请求的商户(包括直连商户、服务商或渠道商)的商户号 mchid 商户API证书序列号serial_no,用于声明所使用的证书 请求随机串nonce_str 时间戳timestamp 签名值signature 注:以上五项签名信息,无顺序要求。 :param path: :param method: :param mchid: :param serial_no: :param private_key: :param data: :param nonce_str: :return: ''' timeStamp = str(int(time.time())) nonce_str = nonce_str or ''.join(str(uuid.uuid4()).split('-')).upper() body = data if isinstance(data, str) else json.dumps(data) if data else '' sign_str = '%s\n%s\n%s\n%s\n%s\n' % (method, path, timeStamp, nonce_str, body) signature = rsa_sign(private_key=private_key, sign_str=sign_str) authorization = 'WECHATPAY2-SHA256-RSA2048 mchid="%s",nonce_str="%s",signature="%s",timestamp="%s",serial_no="%s"' % (mchid, nonce_str, signature, timeStamp, serial_no) return authorization def rsa_sign(private_key, sign_str): message = sign_str.encode('UTF-8') signature = private_key.sign(data=message, padding=PKCS1v15(), algorithm=SHA256()) sign = b64encode(signature).decode('UTF-8').replace('\n', '') return sign def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key): key_bytes = apiv3_key.encode('UTF-8') nonce_bytes = nonce.encode('UTF-8') associated_data_bytes = associated_data.encode('UTF-8') data = b64decode(ciphertext) aesgcm = AESGCM(key=key_bytes) try: result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8') except InvalidTag: result = None return result def format_private_key(private_key_str): pem_start = '-----BEGIN PRIVATE KEY-----\n' pem_end = '\n-----END PRIVATE KEY-----' if not private_key_str.startswith(pem_start): private_key_str = pem_start + private_key_str if not private_key_str.endswith(pem_end): private_key_str = private_key_str + pem_end return private_key_str def load_certificate(certificate_str): try: return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend()) except: return None def load_private_key(private_key_str): try: return load_pem_private_key(data=format_private_key(private_key_str).encode('UTF-8'), password=None, backend=default_backend()) except: raise Exception('商户证书私钥加载失败!') def rsa_verify(timestamp, nonce, body, signature, certificate): sign_str = '%s\n%s\n%s\n' % (timestamp, nonce, body) public_key = certificate.public_key() message = sign_str.encode('UTF-8') signature = b64decode(signature) try: public_key.verify(signature, message, PKCS1v15(), SHA256()) except InvalidSignature: return False return True def rsa_encrypt(text, certificate): data = text.encode('UTF-8') public_key = certificate.public_key() cipherbyte = public_key.encrypt( plaintext=data, padding=OAEP(mgf=MGF1(algorithm=SHA1()), algorithm=SHA1(), label=None) ) return b64encode(cipherbyte).decode('UTF-8') def rsa_decrypt(ciphertext, private_key): data = private_key.decrypt( ciphertext=b64decode(ciphertext), padding=OAEP(mgf=MGF1(algorithm=SHA1()), algorithm=SHA1(), label=None) ) result = data.decode('UTF-8') return result def hmac_sign(key, sign_str): hmac = HMAC(key.encode('UTF-8'), SHA256()) hmac.update(sign_str.encode('UTF-8')) sign = hmac.finalize().hex().upper() return sign def sha256(data): hash = Hash(SHA256()) hash.update(data) return hash.finalize().hex() def sm3(data): hash = Hash(SM3()) hash.update(data) return hash.finalize().hex()