123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280 |
- # coding=utf-8
- import requests
- import json
- from django.conf import settings
- from utils.exceptions import CustomError
- from utils.file_operation import PathAndRename
- class WeChat(object):
- @staticmethod
- def getPreAuthCode(component_appid, component_access_token):
- """
- 第三方平台 获得预授权码
- 结果{"pre_auth_code": "Cx_Dk6qiBE0Dmx4EmlT3oRfArPvwSQ-oa3NL_fwHM7VI08r52wazoZX2Rhpz1dEw", "expires_in": 600}
- """
- url = 'https://api.weixin.qq.com/cgi-bin/component/api_create_preauthcode?component_access_token=' + component_access_token
- param = {
- 'component_appid': component_appid,
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def getComponentAccessToken(component_appid, component_appsecret, component_verify_ticket):
- """第三方平台 获得access_token"""
- url = 'https://api.weixin.qq.com/cgi-bin/component/api_component_token'
- param = {
- 'component_appid': component_appid,
- 'component_appsecret': component_appsecret,
- 'component_verify_ticket': component_verify_ticket
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败'+ str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def getAuthorizationInfo(component_appid, authorization_code, component_access_token):
- """第三方平台 获得授权信息"""
- url = 'https://api.weixin.qq.com/cgi-bin/component/api_query_auth?component_access_token=' + component_access_token
- param = {
- 'component_appid': component_appid,
- 'authorization_code': authorization_code,
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败'+ str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def getAuthorizerInfo(component_appid, authorizer_appid, component_access_token):
- """第三方平台 获得授权方信息"""
- url = 'https://api.weixin.qq.com/cgi-bin/component/api_get_authorizer_info?component_access_token=' + component_access_token
- param = {
- 'component_appid': component_appid,
- 'authorizer_appid': authorizer_appid,
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败'+ str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def getAuthorizerAccessToken(component_appid, authorizer_appid, authorizer_refresh_token, component_access_token):
- """第三方平台 获得授权方 access_token"""
- url = 'https://api.weixin.qq.com/cgi-bin/component/api_authorizer_token?component_access_token=' + component_access_token
- param = {
- 'component_appid': component_appid,
- 'authorizer_appid': authorizer_appid,
- 'authorizer_refresh_token': authorizer_refresh_token
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败'+ str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def getCodeTemplateList(component_access_token):
- '''获取代码模版列表'''
- url = 'https://api.weixin.qq.com/wxa/gettemplatelist?access_token=' + component_access_token
- result = requests.get(url)
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败'+ str(result['errcode']) + ' :' + result['errmsg'])
- return result['template_list']
- @staticmethod
- def commitCode(access_token, template_id, user_version, user_desc):
- '''小程序提交代码'''
- url = 'https://api.weixin.qq.com/wxa/commit?access_token=' + access_token
- param = {
- 'template_id': template_id,
- 'ext_json': "{\"extEnable\": false,\"extAppid\": \"\"}",
- 'user_version': user_version,
- 'user_desc': 'test'
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败'+ str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def submitAuditCode(access_token):
- '''将上传的代码提交审核'''
- url = 'https://api.weixin.qq.com/wxa/submit_audit?access_token=' + access_token
- result = requests.post(url)
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def getLastSubmitAuditCodeStatus(access_token):
- '''查询最新一次提交审核代码的审核状态'''
- url = 'https://api.weixin.qq.com/wxa/get_latest_auditstatus?access_token=' + access_token
- result = requests.get(url)
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def modify_domain(access_token, action, requestdomain, wsrequestdomain, uploaddomain, downloaddomain):
- '''设置小程序服务器域名'''
- url = 'https://api.weixin.qq.com/wxa/modify_domain?access_token=' + access_token
- param = {
- "action": action,
- "requestdomain": requestdomain,
- "wsrequestdomain": wsrequestdomain,
- "uploaddomain": uploaddomain,
- "downloaddomain": downloaddomain
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ' :' + result['errmsg'])
- return result
- @staticmethod
- def code2Session(appid, secret, code):
- '''登录凭证校验'''
- url = 'https://api.weixin.qq.com/sns/jscode2session?appid='+ appid + '&secret=' + secret + '&js_code=' + code + '&grant_type=authorization_code'
- result = requests.get(url)
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ':' + result['errmsg'])
- return result
- @staticmethod
- def getAccessToken(appid, secret,):
- '''获取小程序验证token'''
- url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid='+ appid + '&secret=' + secret
- result = requests.get(url)
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ':' + result['errmsg'])
- return result
- @staticmethod
- def addPlugin(access_token):
- '''小程序插件管理'''
- url = 'https://api.weixin.qq.com/wxa/plugin?access_token=' + access_token
- param = {
- 'action': 'apply',
- 'plugin_appid': 'wxfa43a4a7041a84de'
- }
- result = requests.post(url, data=json.dumps(param))
- result = result.json()
- if 'errcode' in result and result['errcode'] and str(result['errcode']) != '89237':
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ':' + result['errmsg'])
- return result
- @staticmethod
- def getTemplateList(access_token):
- '''获取当前帐号下的个人模板列表'''
- url = 'https://api.weixin.qq.com/wxaapi/newtmpl/gettemplate?access_token=' + access_token
- result = requests.get(url)
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ':' + result['errmsg'])
- return result['data']
- @staticmethod
- def sendSubscribeMessage(access_token, openid, template_id, page, data):
- '''发送订阅消息'''
- url = 'https://api.weixin.qq.com/cgi-bin/message/subscribe/send?access_token=' + access_token
- param = {
- 'touser': openid,
- 'template_id': template_id,
- 'page': page,
- 'data': data
- }
- result = requests.post(url, data=json.dumps(param))
- # print('-------------------------')
- # result = result.json()
- # print(result)
- #result = requests.post(url, data=json.dumps(param))
- #result = result.json()
- #if 'errcode' in result and result['errcode'] and str(result['errcode']) != '89237':
- # raise CustomError(u'微信请求失败' + str(result['errcode']) + ':' + result['errmsg'])
- #return result
- @staticmethod
- def getWXAppCode(access_token, page, company_no):
- '''获取小程序二维码'''
- url = 'https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={}'.format(access_token)
- data = {"scene": "company_no={}".format(company_no),
- "width": 1280,
- "line_color": {"r": 43, "g": 162, "b": 69}, # 自定义颜色
- "is_hyaline": True}
- result = requests.post(url, json=data)
- # print('-------------------------',result)
- upload_path = PathAndRename("company/")
- filename = "%s%s.png" % (
- upload_path.path,
- company_no,
- )
- filename = filename.lower()
- full_filename = "%s%s" % (settings.MEDIA_ROOT, filename)
- with open(full_filename, 'wb') as destination:
- destination.write(result.content)
- return filename
- @staticmethod
- def getDeviceCode(access_token, page, device_id, company_no):
- '''获取设备二维码'''
- url = 'https://api.weixin.qq.com/wxa/getwxacodeunlimit?access_token={}'.format(access_token)
- data = {"scene": "company_no={}&device_id={}".format(company_no, device_id),
- "width": 1280,
- "line_color": {"r": 43, "g": 162, "b": 69}, # 自定义颜色
- "is_hyaline": True}
- result = requests.post(url, json=data)
- upload_path = PathAndRename("device/")
- filename = "%s%s.png" % (
- upload_path.path,
- device_id,
- )
- filename = filename.lower()
- full_filename = "%s%s" % (settings.MEDIA_ROOT, filename)
- with open(full_filename, 'wb') as destination:
- destination.write(result.content)
- return filename
- @staticmethod
- def releaseCode(access_token):
- '''已审核代码发布'''
- url = 'https://api.weixin.qq.com/wxa/release?access_token=' + access_token
- result = requests.post(url, data=json.dumps({}))
- result = result.json()
- if 'errcode' in result and result['errcode']:
- raise CustomError(u'微信请求失败' + str(result['errcode']) + ' :' + result['errmsg'])
- return result
|