import requests import json import time import hmac import hashlib import base64 import urllib.parse from loguru import logger from email.header import Header from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.utils import parseaddr, formataddr import smtplib class TelegramSender(object): def __init__(self, config): self.bot_token = config.get('bot_token') self.chat_id = config.get('chat_id') def send_msg(self, msg): if not self.bot_token or not self.chat_id: return False url = "https://api.telegram.org/bot" + self.bot_token + "/sendMessage" data = { "chat_id": self.chat_id, "text": msg } req = requests.session() try: resp = req.post(url, data=data) result = json.loads(resp.text)['ok'] except Exception as e: logger.error('Telegram消息发送失败, 错误%s' % e) return False return result class DingDingSender(object): def __init__(self, config): self.ding_secret = config.get('ding_secret') self.dingding_base_url = config.get('dingding_base_url') def send_msg(self, msg): if not self.ding_secret or not self.dingding_base_url: return False timestamp = str(round(time.time() * 1000)) secret_enc = self.ding_secret.encode('utf-8') string_to_sign = '{}\n{}'.format(timestamp, self.ding_secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) dingding_url = self.dingding_base_url + "×tamp=" + timestamp + "&sign=" + sign headers = {'Content-Type': 'application/json;charset=utf-8'} body = { "msgtype": "markdown", "markdown": { "title": "提醒", "text": msg }, "at": { "isAtAll": "true" } } req = requests.Session() try: resp = req.post(dingding_url, headers=headers, json=body) errcode = json.loads(resp.text)['errcode'] result = True if errcode == 0 else False except Exception as e: logger.error('钉钉消息发送失败, 错误%s' % e) return False return result class MailSender(object): def __init__(self, config): self.from_add = config.get('from_add') self.to_add = config.get('to_add') self.auth = config.get('auth') self.smtp_server = config.get('smtp_server') self.smtp_port = config.get('smtp_port') @staticmethod def format_add(s): name, addr = parseaddr(s) return formataddr((Header(name, 'utf-8').encode(), addr)) def send_msg(self, message): if not self.from_add or not self.to_add or not self.auth or not self.smtp_server or not self.smtp_port: return False msg = MIMEMultipart('alternative') msg.attach(MIMEText(message, "plain", "utf-8")) msg['From'] = self.format_add("aliyun-sign自动签到 <%s>" % self.from_add) msg['To'] = self.format_add("云盘用户 <%s>" % self.to_add) msg['Subject'] = Header("阿里云盘自动签到", 'utf-8').encode() server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) server.login(self.from_add, self.auth) server.sendmail(self.from_add, [self.to_add], msg.as_string()) server.quit() return True class MessageSender(object): def __init__(self): self.channel_handlers = {} def register_channel(self, channel_name, handler): self.channel_handlers[channel_name] = handler def unregister_channel(self, channel_name): self.channel_handlers.pop(channel_name, None) def channel_handle(self, channel_name): return self.channel_handlers.get(channel_name, None) def send(self, msg): for channel_name in self.channel_handlers: channel = self.channel_handle(channel_name) if not channel or not hasattr(channel, "send_msg"): continue result = channel.send_msg(msg) logger.info('渠道: %s 消息发送结果: %s' % (channel_name, "成功" if result else "失败"))