缘起

想给博客添加Newsletter功能,并且最好支持RSS转Newsletter,这样一来,只要博客更新,Newsletter能自动更新。

这样的服务挺多的,但多半收费,还不便宜,比如著名的Mailchimp

免费的也有,比如Mailbrew,我用了一段时间,但这个产品已不再维护,最近甚至无法登录,只得放弃。

自己动手,丰衣足食。于是开始折腾。

思路

实现RSS转Email,只需要两个功能:

  1. 表单工具,收集读者邮箱地址,需要支持退订
  2. 邮件发送服务,解析RSS内容使用邮件发出

第2点只需要找个邮件发送服务器,然后使用Python脚本,很容易解决。我选了亚马逊的AWS SES邮件服务,按发送量收费,每1000封邮件0.1刀,就我的博客更新频率和订阅量,成本可以忽略。

第1点,我不想(其实也不会)写前端代码,更不想做服务端开发,操作数据库。很自然的考虑找一个支持API操作的第三方数据库+表单。读者邮件列表是比较重要的数据,需要一个靠谱且容易管理的数据库,Notion的Database和Google Sheet是比较好的选择。二者区别:

  • Google Sheet自带表单工具,但国内无法访问
  • Notion国内可访问,但需要找第三方表单工具

我选了Notion,配合NotionForms,后者提供表单页面/组件,收集数据保存到Notion的Database,免费版不限收集的数据量,完全够用。

OK,广告时间到了,欢迎通过下列表单提交订阅,目前主要推送周刊内容,每周一早上更新。

(整个流程我自己测试都没问题,但毕竟下周一第一次推送,如果收到奇奇怪怪的内容,轻拍)

NotionForms免费版唯一的遗憾是不支持数据更新,意味着不能直接支持退订。

暂时的解决方案是:为退订单独提供一个表单及对应的Notion Database,收到退订申请后,手动删除订阅者列表。

如果退订太多,也可以基于Notion API写个脚本处理,以后考虑。

技术实现

基于上述思路,只需要Python脚本就能完成所有工作,仅三步:

  1. 读取Notion的Database中的Email列表
  2. 抓取RSS数据,获取博客更新
  3. 调用AWS SES发邮件

最后配置crontab定时任务,每天自动执行。

准备工作

当然,需要一些准备工作,大致如下,折腾的过程中没有一步步截图,大家将就看看:

  1. 准备Database:在Notion中创建两个Database,分别为订阅列表退订列表。两个列表都必须有名为「Email」的属性,且属性值类型为「Email」。
  2. 准备表单:使用NotionForms连接Notion,基于上述两个Database分别创建表单。关于表单分享方式,NotionForms支持独立的表单页面URL,也提供iframe嵌入。
  3. 准备Notion Token及权限:前往Notion开发者页面,创建一个integration,类型选择Internal,需要取一个名字,比如「NewsletterAPP」。

    1. 会生成一个Token,请保存好,之后Python脚本中需要使用。
    2. 回到Notion的「订阅列表」Database,点击Database菜单中的「Add connections」,找到刚刚创建的「NewsletterAPP」,添加授权。
  4. 准备AWS SES及权限创建账户啥的就不讲了,有几点注意:

    1. 新创建的账户处于沙盒之中,无法给外部发信,需要提交工单申请移出沙盒。工单中需要详细说明用途,发送量,退信处理方式等。
    2. 需要给账户创建IAM身份,然后获取相应的aws_access_key_idaws_secret_access_key,并保存到~/.aws/credentials。(参考
  5. 需要用到的第三方Python库

    1. requests:不解释
    2. feedparser:RSS解析库,非常好用
    3. boto3:AWS SES的官方库,发邮件很方便

完整代码

1、运行代码,需要把AWS SES的身份凭证需要放到~/.aws/credentials文件中,其它的信息全在代码中补充参数即可。

2、完整代码

#!/usr/local/bin/python3
# -*- coding: UTF-8 -*-

import boto3,feedparser,requests,re,datetime
from botocore.exceptions import ClientError
from time import mktime

# ++++++++++
# 各种参数准备
# ++++++++++

# Notion参数
EMAIL_DATABASE_ID = '' # 订阅列表的database id,在database的url中有
NOTION_API_TOKEN = '' # 在Notion的开发者页面integration中查看
EMAIL_DATABASE_URL = 'https://api.notion.com/v1/databases/{database_id}/query'.format(database_id=EMAIL_DATABASE_ID)
HEADERS = {
    'accept': 'application/json',
    'Authorization': 'Bearer {token}'.format(token=NOTION_API_TOKEN),
    'Notion-Version': '2022-06-28',
    'content-type': 'application/json'
}
PAGE_SIZE = 100

# AWS SES参数
REGION_NAME = 'ap-northeast-2' # AES SES地区,在AWS账户中有
SOURCE = 'Name <email_address>' # AWS SES账户中已经授权的发件人
client = boto3.client('ses',region_name=REGION_NAME)

# RSS地址及个人邮箱,邮件用于接收通知
NOTI_MYSELF_EMAIL = 'email_address' # 每次Newsletter发完后,会邮件通知此邮箱
BLOG_URL = 'https://www.skyue.com/category/weekly/' # 博客URL,在Newsletter正文的底部使用,类似于查看更多
RSS_URL = 'https://www.skyue.com/feed/category/weekly/' # 需要转Newsletter的RSS地址
AUTHOR_URL = 'https://www.skyue.com' # 作者的个人主页,Newsletter正文作者处有用到

# NotionForm取消订阅的表单地址,Newsletter正文底部用到
UNSCRIBE_URL = ''


# ++++++++++
# 开始实现功能
# ++++++++++

# 通过Notion接口,获取邮件列表
# Notion数据库中,邮件地址存放在Email列,且Email列的数据类型为Email
# Notion中,至少需要有一个邮件地址,代码未做无地址的兼容
# 返回数据组,存储邮件列表
def get_emails():
    payload = {"page_size": PAGE_SIZE}
    emails = []
    response = requests.post(EMAIL_DATABASE_URL, json=payload, headers=HEADERS)
    for result in response.json()['results']:
        if result['properties']['Email']['email']:
            emails.append(result['properties']['Email']['email'])
    next_cursor = response.json()['next_cursor']

    while next_cursor:
        payload = {"page_size": PAGE_SIZE, 'start_cursor': next_cursor}
        response = requests.post(EMAIL_DATABASE_URL, json=payload, headers=HEADERS)
        for result in response.json()['results']:
            if result['properties']['Email']['email']:
                emails.append(result['properties']['Email']['email'])
        next_cursor = response.json()['next_cursor']

    # 对邮件列表的数据进行格式校验和去重
    emails = list(set(filter_email(emails)))

    return emails

# 定义一个函数,用于检验邮箱地址格式,过滤不正确的邮箱
def filter_email(emails):
    pattern = r'(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)'
    result = []
    for e in emails:
        if re.match(pattern, e.strip()):
            result.append(e.strip())
    return result

# 通过RSS获取文章内容,并处理好HTML邮件正文,使用feedparser库
# 只获取最新一版文章,返回结果为字典,包括标题、链接、发布时间及文章正文
def get_article():
    rss_weekly = feedparser.parse(RSS_URL)
    title = rss_weekly['entries'][0].title
    link = rss_weekly['entries'][0].link
    content = rss_weekly['entries'][0].content[0].value.replace('<a href="', '<a style="color:#3354AA" href="')
    published = datetime.datetime.fromtimestamp(mktime(rss_weekly['entries'][0].published_parsed)) + datetime.timedelta(hours=8)

    # 根据文章字段,生成邮件的正文部分
    content_html = '''
    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
    <html xmlns="http://www.w3.org/1999/xhtml">
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
        <title>
            {title}
        </title>
        <meta name="viewport" content="width=device-width, initial-scale=1.0"/>
    </head>

    <body style="margin: 0; padding: 0; font-size: 1.2em; color:#111; text-decoration:none; ">
        <table cellpadding="0" cellspacing="0" width="100%">
            <tr>
                <td> 
                    <table align="center" border="0" cellpadding="5" cellspacing="0" width="600"  style="border-collapse: collapse; border-color:lightgray; ">
                        <tr>
                            <td>
                                <h1>
                                    <a style="color:black;text-decoration:none;" href="{link}">{title}</a>
                                </h1>
                                <p>
                                    <i>by <a style="color:black" href="{author_url}">拾月</a></i>
                                </p>
                                <p>
                                    {content}
                                </p>
                            </td>
                        </tr>
                        <tr>
                            <td>-- EOF --</td>
                        </tr>
                        <tr>
                            <td>
                                <p>
                                    链接:<a style="color:black;" href="{blog_url}">往期周刊</a> | 
                                    <a style="color:black;" href="{unsubscribe_url}">取消订阅</a>
                                </p>
                            </td>
                        </tr>    
                    </table>
                </td>
            </tr>
        </table>
    </body>
    </html>
    '''.format(
        title=title, 
        link=link, 
        content=content,
        blog_url = BLOG_URL,
        unsubscribe_url = UNSCRIBE_URL,
        author_url = AUTHOR_URL
        )    
    
    return {
        'title': title,
        'link': link,
        'published': published,
        'content_html': content_html
    }

# 定义一个日志写入函数,用于保存相关成功或错误信息到日志文件
def write_log(text):
    with open('email_send_log.txt', 'a') as f:
        f.write(text)


# 定义发送邮件的函数,参数:
# 类型:发送给自己的通知,还是发给读者的文章,会记录到日志中
# 收件人:
# 标题:
# 正文:
# 返回值:发送状态,True or False
def send_email(email_type, to_email, title, body):
    try:
        # 尝试使用aws ses发邮件
        response = client.send_email(
            Destination={
                'ToAddresses': [
                    to_email,
                ],
            },
            Message={
                'Body': {
                    'Html': {
                        'Charset': "UTF-8",
                        'Data': body
                    },
                },
                'Subject': {
                    'Charset': "UTF-8",
                    'Data': title,
                },
            },
            Source=SOURCE,
        )
    # 保存日志,成功或错误都保存,并返回发送状态
    except ClientError as e:
        log = '{dt} {email_type} to {email} error info: {error}\n'.format(
            dt=datetime.datetime.now(),
            error=e.response['Error']['Message'],
            email = to_email,
            email_type = email_type
            )
        write_log(log)
        return False
    else:
        log = "{dt} {email_type} to {email} success message_id: {messageid}\n".format(
            dt = datetime.datetime.now(), 
            messageid=response['MessageId'], 
            email = to_email,
            email_type = email_type
            )
        write_log(log)
        return True

# 批量发Newsletter,返回发送成功和失败的数量
def send_newsletter():
    success_cnt = 0
    failure_cnt = 0
    emails = get_emails()
    article = get_article()
    have_new_post = 0
    #判断文章时间,如果是当天的,才发
    if article['published'].strftime('%Y-%m-%d') == datetime.datetime.now().strftime('%Y-%m-%d'):
        have_new_post = 1
        for email_addr in emails:
            status = send_email('send_newsletter_email' ,email_addr, article['title'], article['content_html'])
            if status == True:
                success_cnt = success_cnt + 1
            else:
                failure_cnt = failure_cnt + 1

    return {
        'success_cnt': success_cnt,
        'failure_cnt': failure_cnt,
        'have_new_post': have_new_post
    }
    
    
if __name__ == '__main__':
    try:
        result = send_newsletter()
        # 记录脚本运营状态
        write_log('{dt} run_script success: success_cnt={success_cnt}, failure_cnt={failure_cnt}, new_post={have_new_post}\n'.format(
            dt=datetime.datetime.now(),
            success_cnt=result['success_cnt'],
            failure_cnt=result['failure_cnt'],
            have_new_post=result['have_new_post']
        ))
        # 邮件通知自己脚本的运行状态
        send_email('send_noti_myself' ,NOTI_MYSELF_EMAIL, 
                   'Newsletter脚本运行通知',
                   '<html>发送成功:{success_cnt},发送失败:{failure_cnt}, 更新数量:{have_new_post}</html>'.format(
                        success_cnt=result['success_cnt'],
                        failure_cnt=result['failure_cnt'],
                        have_new_post=result['have_new_post']
        ))
    except Exception as e:
        write_log('{dt} run_script error: {e}\n'.format(dt=datetime.datetime.now(),e=e))
        # 邮件通知自己脚本的运行状态
        send_email('send_noti_myself' ,NOTI_MYSELF_EMAIL, 
                   'Newsletter脚本运行通知',
                   '<html>脚本运行报错:{e}</html>'.format(e=e))

3、设置定时任务

比如我将代码保存在/sky/job/newsletter/newsletter.py,然后设置如下crontab任务,每周一早上8点执行。

0 8 * * 1 python3 /sky/job/newsletter/newsletter.py >> /sky/job/newsletter/crontab.log 2>&1
🔔 Email 或 RSS 订阅本博客

已有 14 条评论

  1. 一般服务类、产品类的需要newsletter,博客的到是很少见。

    1. 国外挺流行,有个substack,就是专门的newsletter内容平台。
      国内,还是不太有用邮箱接收信息的习惯。

  2. 之前看过newsletter转rss的,现在看到你rss转newsletter,可能面向的对象不同。

    1. 是的,就是针对不同对象。

  3. 飞书多维表格也是可以的,而且我已经实现自动采集和分发,后续考虑,文本转视频,然后视频平台自动分发,就是涉及技术节点多,只能等以后有时间搞一搞了,而且维护社群还是需要牵扯精力的。

    1. 顺便借楼打个广告,内容创意研报,分享个人收集感兴趣的资讯,欢迎订阅,博客地址:baigebg.com(白歌的小站)😱

      1. 倒是没想到飞书,应该是可以的。我不打算搞社群,单纯分享,然后有需要就在留言区或邮件交流下。害怕社交和群聊。
        你的网站打不开,不过好像是我公司网络原因。

        1. 邮件原来还能这么用,推送到微信才看到的,这个古老的信息渠道蛮神奇的,不过,应该是开发者专属吧。

          公司网络有风控,工作电脑也有,我同事之前离职的时候下了几个文档就被约谈了,后来,我就把自用和工作的隔离开了。😂

  4. 很好奇, Newsletter能带来收入么?

    1. 不能,小打小闹又随心所欲的写,不太可能赚钱。
      考虑赚钱,还是运营一个垂直领域的公众号比较靠谱。

  5. 大佬 厉害

    1. 哈哈,就这点爱好了,喜欢折腾。

  6. 又有新玩意可以折腾了!

    1. 哈哈,这下一劳永逸了,还便宜。

添加新评论