Aliyunsign/MsgSender.py

135 lines
4.3 KiB
Python

import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from loguru import logger
from email.header import Header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.utils import parseaddr, formataddr
import smtplib
class TelegramSender(object):
def __init__(self, config):
self.bot_token = config.get('bot_token')
self.chat_id = config.get('chat_id')
def send_msg(self, msg):
if not self.bot_token or not self.chat_id:
return False
url = "https://api.telegram.org/bot" + self.bot_token + "/sendMessage"
data = {
"chat_id": self.chat_id,
"text": msg
}
req = requests.session()
try:
resp = req.post(url, data=data)
result = json.loads(resp.text)['ok']
except Exception as e:
logger.error('Telegram消息发送失败, 错误%s' % e)
return False
return result
class DingDingSender(object):
def __init__(self, config):
self.ding_secret = config.get('ding_secret')
self.dingding_base_url = config.get('dingding_base_url')
def send_msg(self, msg):
if not self.ding_secret or not self.dingding_base_url:
return False
timestamp = str(round(time.time() * 1000))
secret_enc = self.ding_secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, self.ding_secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
dingding_url = self.dingding_base_url + "&timestamp=" + timestamp + "&sign=" + sign
headers = {'Content-Type': 'application/json;charset=utf-8'}
body = {
"msgtype": "markdown",
"markdown": {
"title": "提醒",
"text": msg
},
"at": {
"isAtAll": "true"
}
}
req = requests.Session()
try:
resp = req.post(dingding_url, headers=headers, json=body)
errcode = json.loads(resp.text)['errcode']
result = True if errcode == 0 else False
except Exception as e:
logger.error('钉钉消息发送失败, 错误%s' % e)
return False
return result
class MailSender(object):
def __init__(self, config):
self.from_add = config.get('from_add')
self.to_add = config.get('to_add')
self.auth = config.get('auth')
self.smtp_server = config.get('smtp_server')
self.smtp_port = config.get('smtp_port')
@staticmethod
def format_add(s):
name, addr = parseaddr(s)
return formataddr((Header(name, 'utf-8').encode(), addr))
def send_msg(self, message):
if not self.from_add or not self.to_add or not self.auth or not self.smtp_server or not self.smtp_port:
return False
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(message, "plain", "utf-8"))
msg['From'] = self.format_add("aliyun-sign自动签到 <%s>" % self.from_add)
msg['To'] = self.format_add("云盘用户 <%s>" % self.to_add)
msg['Subject'] = Header("阿里云盘自动签到", 'utf-8').encode()
server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
server.login(self.from_add, self.auth)
server.sendmail(self.from_add, [self.to_add], msg.as_string())
server.quit()
return True
class MessageSender(object):
def __init__(self):
self.channel_handlers = {}
def register_channel(self, channel_name, handler):
self.channel_handlers[channel_name] = handler
def unregister_channel(self, channel_name):
self.channel_handlers.pop(channel_name, None)
def channel_handle(self, channel_name):
return self.channel_handlers.get(channel_name, None)
def send(self, msg):
for channel_name in self.channel_handlers:
channel = self.channel_handle(channel_name)
if not channel or not hasattr(channel, "send_msg"):
continue
result = channel.send_msg(msg)
logger.info('渠道: %s 消息发送结果: %s' % (channel_name, "成功" if result else "失败"))