utils.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # -*- coding: utf-8 -*-
  2. import json
  3. import time
  4. import uuid
  5. from base64 import b64decode, b64encode
  6. from cryptography.exceptions import InvalidSignature, InvalidTag
  7. from cryptography.hazmat.backends import default_backend
  8. from cryptography.hazmat.primitives.asymmetric.padding import MGF1, OAEP, PKCS1v15
  9. from cryptography.hazmat.primitives.ciphers.aead import AESGCM
  10. from cryptography.hazmat.primitives.hashes import SHA1, SHA256, SM3, Hash
  11. from cryptography.hazmat.primitives.hmac import HMAC
  12. from cryptography.hazmat.primitives.serialization import load_pem_private_key
  13. from cryptography.x509 import load_pem_x509_certificate
  14. def build_authorization(path, method, mchid, serial_no, private_key, data=None, nonce_str=None):
  15. '''
  16. 一、构建签名串
  17. 签名串一共有五行,每一行为一个参数。行尾以 \n(换行符,ASCII编码值为0x0A)结束,包括最后一行。如果参数本身以\n结束,也需要附加一个\n。
  18. 第一步,获取HTTP请求的方法(GET, POST, PUT)等
  19. 第二步,获取请求的绝对URL,并去除域名部分得到参与签名的URL。如果请求中有查询参数,URL末尾应附加有'?'和对应的查询字符串。
  20. 第三步,获取发起请求时的系统当前时间戳,即格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数,作为请求时间戳。微信支付会拒绝处理很久之前发起的请求,请商户保持自身系统的时间准确。
  21. 第四步,生成一个请求随机串,可参见生成随机数算法。这里,我们使用命令行直接生成一个。
  22. 第五步,获取请求中的请求报文主体(request body)。
  23. 请求方法为GET时,报文主体为空。
  24. 当请求方法为POST或PUT时,请使用真实发送的JSON报文。
  25. 图片上传API,请使用meta对应的JSON报文。
  26. 对于下载证书的接口来说,请求报文主体是一个空串。
  27. 二、计算签名值
  28. 绝大多数编程语言提供的签名函数支持对签名数据进行签名。强烈建议商户调用该类函数,使用商户私钥对待签名串进行SHA256 with RSA签名,并对签名结果进行Base64编码得到签名值。
  29. 三、设置HTTP头
  30. 微信支付商户API V3要求通过HTTP Authorization头来传递签名 Authorization由认证类型和签名信息两部分组成
  31. 1.认证类型,目前为WECHATPAY2-SHA256-RSA2048
  32. 2.签名信息
  33. 发起请求的商户(包括直连商户、服务商或渠道商)的商户号 mchid
  34. 商户API证书序列号serial_no,用于声明所使用的证书
  35. 请求随机串nonce_str
  36. 时间戳timestamp
  37. 签名值signature
  38. 注:以上五项签名信息,无顺序要求。
  39. :param path:
  40. :param method:
  41. :param mchid:
  42. :param serial_no:
  43. :param private_key:
  44. :param data:
  45. :param nonce_str:
  46. :return:
  47. '''
  48. timeStamp = str(int(time.time()))
  49. nonce_str = nonce_str or ''.join(str(uuid.uuid4()).split('-')).upper()
  50. body = data if isinstance(data, str) else json.dumps(data) if data else ''
  51. sign_str = '%s\n%s\n%s\n%s\n%s\n' % (method, path, timeStamp, nonce_str, body)
  52. signature = rsa_sign(private_key=private_key, sign_str=sign_str)
  53. authorization = 'WECHATPAY2-SHA256-RSA2048 mchid="%s",nonce_str="%s",signature="%s",timestamp="%s",serial_no="%s"' % (mchid, nonce_str, signature, timeStamp, serial_no)
  54. return authorization
  55. def rsa_sign(private_key, sign_str):
  56. message = sign_str.encode('UTF-8')
  57. signature = private_key.sign(data=message, padding=PKCS1v15(), algorithm=SHA256())
  58. sign = b64encode(signature).decode('UTF-8').replace('\n', '')
  59. return sign
  60. def aes_decrypt(nonce, ciphertext, associated_data, apiv3_key):
  61. key_bytes = apiv3_key.encode('UTF-8')
  62. nonce_bytes = nonce.encode('UTF-8')
  63. associated_data_bytes = associated_data.encode('UTF-8')
  64. data = b64decode(ciphertext)
  65. aesgcm = AESGCM(key=key_bytes)
  66. try:
  67. result = aesgcm.decrypt(nonce=nonce_bytes, data=data, associated_data=associated_data_bytes).decode('UTF-8')
  68. except InvalidTag:
  69. result = None
  70. return result
  71. def format_private_key(private_key_str):
  72. pem_start = '-----BEGIN PRIVATE KEY-----\n'
  73. pem_end = '\n-----END PRIVATE KEY-----'
  74. if not private_key_str.startswith(pem_start):
  75. private_key_str = pem_start + private_key_str
  76. if not private_key_str.endswith(pem_end):
  77. private_key_str = private_key_str + pem_end
  78. return private_key_str
  79. def load_certificate(certificate_str):
  80. try:
  81. return load_pem_x509_certificate(data=certificate_str.encode('UTF-8'), backend=default_backend())
  82. except:
  83. return None
  84. def load_private_key(private_key_str):
  85. try:
  86. return load_pem_private_key(data=format_private_key(private_key_str).encode('UTF-8'), password=None, backend=default_backend())
  87. except:
  88. raise Exception('商户证书私钥加载失败!')
  89. def rsa_verify(timestamp, nonce, body, signature, certificate):
  90. sign_str = '%s\n%s\n%s\n' % (timestamp, nonce, body)
  91. public_key = certificate.public_key()
  92. message = sign_str.encode('UTF-8')
  93. signature = b64decode(signature)
  94. try:
  95. public_key.verify(signature, message, PKCS1v15(), SHA256())
  96. except InvalidSignature:
  97. return False
  98. return True
  99. def rsa_encrypt(text, certificate):
  100. data = text.encode('UTF-8')
  101. public_key = certificate.public_key()
  102. cipherbyte = public_key.encrypt(
  103. plaintext=data,
  104. padding=OAEP(mgf=MGF1(algorithm=SHA1()), algorithm=SHA1(), label=None)
  105. )
  106. return b64encode(cipherbyte).decode('UTF-8')
  107. def rsa_decrypt(ciphertext, private_key):
  108. data = private_key.decrypt(
  109. ciphertext=b64decode(ciphertext),
  110. padding=OAEP(mgf=MGF1(algorithm=SHA1()), algorithm=SHA1(), label=None)
  111. )
  112. result = data.decode('UTF-8')
  113. return result
  114. def hmac_sign(key, sign_str):
  115. hmac = HMAC(key.encode('UTF-8'), SHA256())
  116. hmac.update(sign_str.encode('UTF-8'))
  117. sign = hmac.finalize().hex().upper()
  118. return sign
  119. def sha256(data):
  120. hash = Hash(SHA256())
  121. hash.update(data)
  122. return hash.finalize().hex()
  123. def sm3(data):
  124. hash = Hash(SM3())
  125. hash.update(data)
  126. return hash.finalize().hex()