import tkinter as tk
from tkinter import ttk, filedialog, messagebox
from tkinter.scrolledtext import ScrolledText
import concurrent.futures
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import csv
import time
USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
)
DEFAULT_TIMEOUT = 15
MAX_WORKERS = 5
RETRY_COUNT = 2
def fetch_url(url: str) -> tuple[str, str, str]:
"""
指定URLを取得し、(url, extracted_text, error_message) を返す。
error_message が空文字なら成功。
"""
headers = {"User-Agent": USER_AGENT, "Accept-Language": "ja,en;q=0.9"}
last_err = ""
for attempt in range(1, RETRY_COUNT + 2):
try:
resp = requests.get(url, headers=headers, timeout=DEFAULT_TIMEOUT)
resp.raise_for_status()
resp.encoding = resp.apparent_encoding or resp.encoding
text = extract_main_text(resp.text)
text = text.strip()
if not text:
last_err = "本文が抽出できませんでした。"
else:
return url, text, ""
except requests.RequestException as e:
last_err = f"HTTPエラー: {e}"
except Exception as e:
last_err = f"解析エラー: {e}"
# 軽いバックオフ
time.sleep(0.4 * attempt)
return url, "", last_err
def extract_main_text(html: str) -> str:
"""
とりあえずの本文抽出:
1) <article> 内の <p> を優先
2) なければ全体の <p> を連結
3) ヘッダやフッタ、スクリプトなどは除去
"""
soup = BeautifulSoup(html, "lxml")
# 不要タグ除去
for tag in soup(["script", "style", "noscript", "svg", "footer", "nav", "form", "aside"]):
tag.decompose()
# article優先
article = soup.find("article")
if article:
parts = [clean_text(p.get_text(" ", strip=True)) for p in article.find_all("p")]
text = "\n".join(p for p in parts if p)
if len(text) >= 200: # ある程度の長さが取れたら採用
return text
# main/section などの候補から長文要素を選ぶ
candidates = soup.select("main, section, div[id*=content], div[class*=content], div[id*=main], div[class*=main]")
best = ""
for cand in candidates[:8]:
parts = [clean_text(p.get_text(" ", strip=True)) for p in cand.find_all("p")]
t = "\n".join(p for p in parts if p)
if len(t) > len(best):
best = t
if len(best) >= 200:
return best
# 最後は全<p>
parts = [clean_text(p.get_text(" ", strip=True)) for p in soup.find_all("p")]
return "\n".join(p for p in parts if p)
def clean_text(s: str) -> str:
# 余分な空白を整形
s = " ".join(s.split())
# ありがちなUIラベルの一部を軽く除去(必要に応じて拡張)
noise = ["同意する", "Cookie", "クッキー", "プライバシー", "メニュー", "検索", "購読", "ログイン"]
if any(n in s and len(s) < 50 for n in noise):
return ""
return s
class App(tk.Tk):
def __init__(self):
super().__init__()
self.title("URL本文取得ツール")
self.geometry("1080x720")
self._build_ui()
self.results = {} # url -> {"text": str, "error": str}
def _build_ui(self):
root = ttk.Frame(self)
root.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
paned = ttk.Panedwindow(root, orient=tk.HORIZONTAL)
paned.pack(fill=tk.BOTH, expand=True)
# 左パネル(URL入力)
left = ttk.Frame(paned)
paned.add(left, weight=1)
ttk.Label(left, text="URL(1行1件)").pack(anchor="w")
self.url_text = ScrolledText(left, height=10, wrap=tk.NONE, undo=True)
self.url_text.pack(fill=tk.BOTH, expand=True, pady=(4, 8))
btn_row = ttk.Frame(left)
btn_row.pack(fill=tk.X, pady=(0, 8))
self.btn_fetch = ttk.Button(btn_row, text="取得開始", command=self.on_fetch)
self.btn_fetch.pack(side=tk.LEFT)
self.btn_clear = ttk.Button(btn_row, text="入力クリア", command=lambda: self.url_text.delete("1.0", tk.END))
self.btn_clear.pack(side=tk.LEFT, padx=6)
self.progress = ttk.Progressbar(left, mode="determinate", maximum=100)
self.progress.pack(fill=tk.X)
# 右パネル(ログ+結果)
right = ttk.Frame(paned)
paned.add(right, weight=2)
right_paned = ttk.Panedwindow(right, orient=tk.VERTICAL)
right_paned.pack(fill=tk.BOTH, expand=True)
# ログ
log_frame = ttk.Frame(right_paned)
right_paned.add(log_frame, weight=1)
ttk.Label(log_frame, text="ログ / ステータス").pack(anchor="w")
self.log_text = ScrolledText(log_frame, height=10, wrap=tk.WORD, state=tk.NORMAL)
self.log_text.pack(fill=tk.BOTH, expand=True, pady=(4, 8))
# 抽出テキスト
out_frame = ttk.Frame(right_paned)
right_paned.add(out_frame, weight=2)
ttk.Label(out_frame, text="抽出テキスト(URLごとに区切り)").pack(anchor="w")
self.output_text = ScrolledText(out_frame, wrap=tk.WORD)
self.output_text.pack(fill=tk.BOTH, expand=True, pady=(4, 8))
save_row = ttk.Frame(out_frame)
save_row.pack(fill=tk.X)
ttk.Button(save_row, text="TXT保存", command=self.on_save_txt).pack(side=tk.LEFT)
ttk.Button(save_row, text="CSV保存(URL,本文)", command=self.on_save_csv).pack(side=tk.LEFT, padx=8)
ttk.Button(save_row, text="結果クリア", command=self.clear_results).pack(side=tk.LEFT, padx=8)
def log(self, msg: str):
self.log_text.insert(tk.END, msg + "\n")
self.log_text.see(tk.END)
self.update_idletasks()
def on_fetch(self):
urls = [u.strip() for u in self.url_text.get("1.0", tk.END).splitlines() if u.strip()]
if not urls:
messagebox.showwarning("警告", "URLを入力してください。")
return
# URL形式の軽いチェック
bad = [u for u in urls if not urlparse(u).scheme]
if bad:
messagebox.showwarning("警告", f"不正なURLがあります:\n- " + "\n- ".join(bad[:5]))
return
self.results.clear()
self.output_text.delete("1.0", tk.END)
self.log_text.delete("1.0", tk.END)
self.progress["value"] = 0
self.progress["maximum"] = len(urls)
self.btn_fetch.config(state=tk.DISABLED)
self.log(f"開始: {len(urls)} 件のURLを処理します。")
def run():
done_count = 0
chunks = []
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
future_map = {ex.submit(fetch_url, u): u for u in urls}
for future in concurrent.futures.as_completed(future_map):
url = future_map[future]
try:
u, text, err = future.result()
self.results[u] = {"text": text, "error": err}
if err:
self.log(f"[失敗] {u}\n └ {err}")
else:
self.log(f"[成功] {u} 文字数: {len(text)}")
chunks.append(f"===== {u} =====\n{text}\n\n")
except Exception as e:
self.results[url] = {"text": "", "error": str(e)}
self.log(f"[例外] {url}\n └ {e}")
finally:
done_count += 1
self.progress["value"] = done_count
if chunks:
self.output_text.insert(tk.END, "".join(chunks))
self.output_text.see("1.0")
self.log("完了。")
self.btn_fetch.config(state=tk.NORMAL)
# スレッドで実行してUIフリーズ回避
self.after(50, run)
def on_save_txt(self):
if not self.results:
messagebox.showinfo("情報", "保存する結果がありません。先に取得してください。")
return
path = filedialog.asksaveasfilename(
defaultextension=".txt", filetypes=[("Text", "*.txt")]
)
if not path:
return
try:
combined = self.output_text.get("1.0", tk.END).strip()
with open(path, "w", encoding="utf-8") as f:
f.write(combined)
messagebox.showinfo("成功", f"保存しました:\n{path}")
except Exception as e:
messagebox.showerror("エラー", f"保存に失敗しました:\n{e}")
def on_save_csv(self):
if not self.results:
messagebox.showinfo("情報", "保存する結果がありません。先に取得してください。")
return
path = filedialog.asksaveasfilename(
defaultextension=".csv", filetypes=[("CSV", "*.csv")]
)
if not path:
return
try:
with open(path, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(["url", "text", "error"])
for u, d in self.results.items():
writer.writerow([u, d.get("text", ""), d.get("error", "")])
messagebox.showinfo("成功", f"保存しました:\n{path}")
except Exception as e:
messagebox.showerror("エラー", f"保存に失敗しました:\n{e}")
def clear_results(self):
self.results.clear()
self.output_text.delete("1.0", tk.END)
self.log_text.delete("1.0", tk.END)
self.progress["value"] = 0
if __name__ == "__main__":
App().mainloop()
目次
コメント