网络知识 娱乐 企业微信机器人消息发送(文本、图片、文件)

企业微信机器人消息发送(文本、图片、文件)

文章目录

  • 一、企业微信机器人用途
  • 二、前置条件
    • 获取webhookurl
  • 三、方法实现
    • 1.发送文本消息方法
    • 2.发送图片方法
    • 3.发送文件方法
  • 四、整体代码


一、企业微信机器人用途

本文记录将企业微信机器人应用在接口自动化测试过程中,对项目初始化异常报警以及执行结束将报告输出。
企业微信机器人API文档:https://developer.work.weixin.qq.com/document/path/92455


二、前置条件

获取webhookurl

具有后台权限的管理员添加机器人后,可读取到机器人特有的webhookurl。将机器人关联到某一群组中,请求webhookurl时,机器人将携带的消息发送至该群。
将机器人信息写入配置文件***.ini中:

[ALARM_ROBOT]
# 存放webhookurl
robot_curl=https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=****
# 存放需要@的成员的手机号码
mentioned_mobile_list=18*********

三、方法实现

1.发送文本消息方法

构造请求体,txt为待发送文本;mentioned_mobile_list为需要@的成员的手机号码,通过配置文件读取。未添加对返回报文的校验,只写入日志。

代码如下(示例):

    def send_txt(self, txt: str):
        """
        企业微信机器人发送文本,例:"广州今日天气:29度,大部分多云,降雨概率:60%"
        :param txt: 待发送文本内容
        :return:
        """
        self.req_json["msgtype"] = "text"
        self.req_json["text"] = {}
        self.req_json["text"]["content"] = txt
        self.req_json["text"]["mentioned_mobile_list"] = [phone for phone in self.robot_info['mentioned_mobile_list'].split(',')]
        try:
            resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
            mylog.info(f"企业微信机器人发送消息:req_json: {self.req_json}n"
                       f"response: {resp.text}")
        except Exception:
            mylog.exception(f"企业微信机器人发送信息失败,text: {txt}")

2.发送图片方法

需要对图片进行base64和md5转换,附转换方法。

代码如下(示例):

    def send_pic(self, pic_path: str) -> bool:
        """
        企业微信机器人发送图片
        :param pic_path: 图片路径
        :return:
        """
        self.req_json["msgtype"] = "image"
        self.req_json["image"] = {}
        self.req_json["image"]["base64"] = path2base64(pic_path)
        self.req_json["image"]["md5"] = path2md5(pic_path)
        try:
            resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
            mylog.info(f"企业微信机器人发送图片:req_json: {self.req_json}n"
                       f"response: {resp.text}")
        except Exception:
            mylog.exception(f"企业微信机器人发送图片失败,req_json: {self.req_json}")

base64与md5转换方法:

import base64
import hashlib
def path2base64(path: str) -> str:
    """
    文件转换为base64
    :param path: 文件路径
    :return:
    """
    with open(path, "rb") as f:
        byte_data = f.read()
    base64_str = base64.b64encode(byte_data).decode("ascii")    # 二进制转base64
    return base64_str


def path2md5(path: str) -> str:
    """
    文件转换为md5
    :param path: 文件路径
    :return:
    """
    with open(path, "rb") as f:
        byte_data = f.read()
    md5_str = md5(byte_data)
    return md5_str


def md5(text: all) -> str:
    """
    md5加密
    :param text:
    :return:
    """
    m = hashlib.md5()
    m.update(text)
    return m.hexdigest()

3.发送文件方法

支持所有文件类型,excel/word/ppt/html/zip等等,发送文件前需要先上传文件。

上传文件方法:

    def upload_file(self, file_path: str) -> str:
        """
        企业微信机器人上传文件,发送文件前需要现上传
        :param file_path: 文件路径
        :return:
        """
        try:
            data = {'file': open(file_path, 'rb')}
            resp = requests.post(self.upload_url, files=data)
            json_res = resp.json()
            if json_res.get('media_id'):
                mylog.info(f"企业微信机器人上传文件成功,file:{file_path}")
                return json_res.get('media_id')
        except Exception:
            mylog.exception(f"企业微信机器人上传文件失败,file: {file_path}")
            return ""

发送文件方法:

    def send_file(self, file_path: str) -> bool:
        """
        企业微信机器人发送文件,例如:report.html
        :param file_path: 文件路径
        :return:
        """
        try:
            self.req_json["msgtype"] = "file"
            self.req_json["file"] = {}
            self.req_json["file"]["media_id"] = self.upload_file(file_path)
            resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
            mylog.info(f"企业微信机器人发送文件:req_json: {self.req_json}n"
                       f"response: {resp.text}")
        except Exception:
            mylog.exception(f"企业微信机器人发送文件失败,req_json: {self.req_json}")

四、整体代码

import re
import requests
from base.init_data import InitData
from base.my_logger import MyLogger
from common.my_function import path2md5, path2base64

mylog = MyLogger()


class WechatRobot(InitData):
    """
    企业微信机器人发送消息类,当前支持文本、图片、文件,根据腾讯提供的API文档还支持图文、markdown、模板卡片等,待有需求时添加。
    每个机器人发送的消息不能超过20条/分钟。
    企业微信机器人API文档:https://developer.work.weixin.qq.com/document/path/92455
    """

    def __init__(self):
        self.headers = {'Content-Type': 'application/json'}
        self.robot_curl = self.robot_info['robot_curl']
        self.upload_url = f'https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={re.findall("key=(.*)&*", self.robot_curl)[0]}&type=file'
        self.req_json = {}

    def send_txt(self, txt: str) -> bool:
        """
        企业微信机器人发送文本,例:"广州今日天气:29度,大部分多云,降雨概率:60%"
        :param txt: 待发送文本内容
        :return:
        """
        self.req_json["msgtype"] = "text"
        self.req_json["text"] = {}
        self.req_json["text"]["content"] = txt
        self.req_json["text"]["mentioned_mobile_list"] = [phone for phone in
                                                          self.robot_info['mentioned_mobile_list'].split(',')]
        try:
            resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
            mylog.info(f"企业微信机器人发送消息:req_json: {self.req_json}n"
                       f"response: {resp.text}")
        except Exception:
            mylog.exception(f"企业微信机器人发送信息失败,text: {txt}")

    def send_pic(self, pic_path: str) -> bool:
        """
        企业微信机器人发送图片
        :param pic_path: 图片路径
        :return:
        """
        self.req_json["msgtype"] = "image"
        self.req_json["image"] = {}
        self.req_json["image"]["base64"] = path2base64(pic_path)
        self.req_json["image"]["md5"] = path2md5(pic_path)
        try:
            resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
            mylog.info(f"企业微信机器人发送图片:req_json: {self.req_json}n"
                       f"response: {resp.text}")
        except Exception:
            mylog.exception(f"企业微信机器人发送图片失败,req_json: {self.req_json}")

    def send_file(self, file_path: str) -> bool:
        """
        企业微信机器人发送文件,例如:report.html
        :param file_path: 文件路径
        :return:
        """
        try:
            self.req_json["msgtype"] = "file"
            self.req_json["file"] = {}
            self.req_json["file"]["media_id"] = self.upload_file(file_path)
            resp = requests.post(url=self.robot_curl, headers=self.headers, json=self.req_json)
            mylog.info(f"企业微信机器人发送文件:req_json: {self.req_json}n"
                       f"response: {resp.text}")
        except Exception:
            mylog.exception(f"企业微信机器人发送文件失败,req_json: {self.req_json}")

    def upload_file(self, file_path: str) -> str:
        """
        企业微信机器人上传文件,发送文件前需要现上传
        :param file_path: 文件路径
        :return:
        """
        try:
            data = {'file': open(file_path, 'rb')}
            resp = requests.post(self.upload_url, files=data)
            json_res = resp.json()
            if json_res.get('media_id'):
                mylog.info(f"企业微信机器人上传文件成功,file:{file_path}")
                return json_res.get('media_id')
        except Exception:
            mylog.exception(f"企业微信机器人上传文件失败,file: {file_path}")
            return ""


if __name__ == '__main__':
    wechat_robot = WechatRobot()
    file_path = './testdatas.zip'
    wechat_robot.send_file(file_path)