やってみる

アウトプットすべく己を導くためのブログ。その試行錯誤すらたれ流す。

PythonでRSSからHTMLの本文を抽出してSQLite3に挿入する(重複してしまう版)

 本文抽出がうまくいかないだろうから、1件だけ登録する。

成果物

ソフトウェア構成

  • run.sh
  • get_news.py
  • mod/
    • NewsDb.py
    • NewsImagesDb.py
    • get_html.py
    • HtmlContentExtractor.py

出力結果

 カレントディレクトリにSQLite3のDBファイルnews.dbが作成される。内容は指定したRSSの公開日時、URL、タイトル、本文。

コード

run.sh

 Pythonコードを実行するためのシェルスクリプト。ターミナルで実行したときのカレントディレクトリを渡すのが主な仕事。

[ $# -lt 1 ] && { echo '第1引数にRSSのURLを指定してください。' 1>&2; exit 1; }
SCRIPT_DIR=$(cd $(dirname $0); pwd)
cd "$SCRIPT_DIR"
python3 get_news.py "$1" "$SCRIPT_DIR"

 使うときは以下。

run.sh 任意RSSのURL

get_news.py

 ニュースを取得してSQLite3へ保存する。

#!/usr/bin/env python3
# coding: utf8
import feedparser
import datetime
import sys
from mod import get_html
from mod import NewsDb
from mod import NewsImagesDb
from mod import HtmlContentExtractor

if len(sys.argv) < 2:
    raise Error('第1引数にRSSのURLを指定してください。')
    exit()
if len(sys.argv) < 3:
    raise Error('第2引数にSQLite3DBルートパスを指定してください。')
    exit()
rss = sys.argv[1]
db_dir_path = sys.argv[2]

entries = feedparser.parse(rss).entries
news_db = NewsDb.NewsDb(db_dir_path)
extractor = HtmlContentExtractor.HtmlContentExtractor()
for entry in entries:
    published = (datetime.datetime
        .strptime(entry.published, 
                  '%a, %d %b %Y %H:%M:%S %z')
        .strftime('%Y-%m-%dT%H:%M:%SZ%z'))
    url = entry.link
    title = entry.title
    body = extractor.extract(get_html.get_html(url))
    news_db.append_insert_stmt(published, url, title, body);
    break; # HTML取得を1件だけでやめる
news_db.insert();

NewsDb.py

 ニュース用DB。テーブル作成とレコード挿入する。

import sqlite3
import os

class NewsDb:
    def __init__(self, root):
        path = os.path.join(root, 'news.db')
        self.conn = sqlite3.connect(path)
        self.create_table()
        self.stmts = []
    def __del__(self): self.conn.close()
    def create_table(self):    
        cur = self.conn.cursor()
        cur.executescript(self.__create_table_sql())
    def __create_table_sql(self):
        return '''
create table if not exists news(
  id         integer primary key,
  published  text,
  url        text,
  title      text,
  body       text  -- URL先から本文だけを抽出したプレーンテキスト
);
create index if not exists idx_news on 
  news(published desc, id desc, url, title);
create table if not exists sources(
  id       integer primary key,
  domain   text, -- URLのドメイン名
  name     text, -- 情報源名
  created  text  -- 登録日時(同一ドメイン名が複数あるとき新しいほうを表示する)
);
create index if not exists idx_sources on 
  sources(domain, created desc, id desc, name);
'''
    def __insert_sql(self): 
        return 'insert into news(published,url,title,body) values(?,?,?,?)'
    def append_insert_stmt(self, published, url, title, body):
        self.stmts.append((published, url, title, body))
    def insert(self):
        if 0 == len(self.stmts): return
        try:
            cur = self.conn.cursor()
            cur.executemany(self.__insert_sql(), self.stmts)
            self.conn.commit()
            self.stmts.clear()
        except: 
            import traceback
            traceback.print_exc()
            self.conn.rollback()

NewsImagesDb.py

 ニュース画像用DB。今回は使っていない。

import sqlite3
import os

class NewsImagesDb:
    def __init__(self, root):
        path = os.path.join(root, 'news_images.db')
        self.conn = sqlite3.connect()
        self.create_table()
        self.stmts = []
    def __del__(self): self.conn.close()
    def create_table(self):    
        cur = self.conn.cursor()
        cur.execute(self.__create_table_sql())
    def __create_table_sql(self):
        return '''
create table if not exists images(
  news_id integer, -- どの記事に対応した画像か
  url     text,    -- 拡張子も含めているはず。これ重要
  image   blob     -- バイナリ
);'''
    def append_insert_stmt(self, news_id, url, image):
        self.stmts.append("insert into images(news_id, url, image) values("
            + "'" + news_id + "',"
            + "'" + url       + "',"
            + "'" + image      + "'"
            + ");");
    def insert(self):
        if 0 == len(self.stmts): return
        self.stmts.insert(0, "begin;")
        self.stmts.append("end;")
        cur = self.conn.cursor()
        cur.execute("\n".join(stmts))
        self.stmts.clear()

get_html.py

 URLからHTMLを取得する。UTF-8に変換して。

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup
import time

def get_html(url, wait_second=1):
    options = Options()
    options.set_headless(True)
    driver = webdriver.Chrome(chrome_options=options)
    driver.get(url)
    time.sleep(wait_second)
    html = driver.page_source.encode('utf-8').decode('utf-8')
    return html

HtmlContentExtractor.py

 本文を抽出する。HTMLとプレーンテキスト形式で。

import sys
import os
import extractcontent3

class HtmlContentExtractor:
    def __init__(self, option=None):
        self.__html = None
        self.__text = None
        self.__extractor = extractcontent3.ExtractContent()
        if option is not None: self.__extractor.set_option(option) # option = {"threshold":50}
    @property
    def Html(self): return self.__html
    @property
    def Text(self): return self.__text
    def extract(self, html):
        self.__extractor.analyse(html)
#        text, title = extractor.as_text()
        self.__html, title = self.__extractor.as_html()
#        title = extractor.extract_title(html)
        self.__text = self.__format_to_text(html)
        return self.__text
    def __format_to_text(self, html):
        import re
        import unicodedata
        st = re.sub(r"<p>([^ ])", r" \1", html) # 段落の先頭は全角スペース
        st = re.sub(r"</p>", "\n\n", st) # 段落の末尾は2つ改行する
        st = re.sub(r"</br>", "\n", st)
        st = re.sub(r"<br>", "\n", st)
        st = re.sub(r"<.+?>", "", st)
        # Convert from wide character to ascii
        if st and type(st) != str: st = unicodedata.normalize("NFKC", st)
        st = re.sub(r"[\u2500-\u253f\u2540-\u257f]", "", st)  # 罫線(keisen)
#        st = re.sub(r"&(.*?);", lambda x: self.CHARREF.get(x.group(1), x.group()), st)
        st = re.sub(r"[ \t]+", " ", st)
        return st.rstrip("\n\t ")
    def __show_meta(self):
        print('extractcontent3 メタ情報')
        print(extractcontent3.__version__)
        print(extractcontent3.__file__)
        print(dir(extractcontent3))

つまづきポイント

sqlite3

cur.execute('create table ...; ...;')
    cur.execute(self.__create_table_sql())
sqlite3.Warning: You can only execute one statement at a time.

 executeメソッドは1度に1ステートメントのみ実行可能。だが、それでは超低速になってしまう。

 executemanyメソッドを使うべき。

try:
    cur.executemany('insert into T(A,B) values(?,?)', [(0,'A'),(1,'B'),(2,'C')])
    conn.commit()
except:
    conn.rollback()

 または、1度に複数のステートメントを実行させたいならexecutescriptメソッドを使う。

cur.executescript('create table ...; ...;')

問題

  • RSSによる違い
    • 日付フォーマット
    • 本文が取得できない
      • 会員登録、ログイン、「続きを読む」などに阻まれる

 この問題が致命的なため、DBに取り込むのは1件だけにしている。様子見。

対象環境

$ uname -a
Linux raspberrypi 4.19.42-v7+ #1218 SMP Tue May 14 00:48:17 BST 2019 armv7l GNU/Linux

前回まで