# -*- coding: utf-8 -*-
"""
Naver News Search -> Pandas DataFrame -> CSV 저장 스크립트
요구사항:
1) 사용자 입력: 검색 키워드, 검색량(최대 1000)
2) 입력 키워드로 검색
3) 입력 검색량 만큼 수집 (API 페이지네이션)
4) 결과를 Pandas DataFrame으로
5) DataFrame을 CSV로 저장
6) CSV 파일명은 영문(ASCII)으로 생성
"""
import os
import sys
import time
import json
import re
import html
import math
import urllib.request
import urllib.parse
from datetime import datetime
try:
import pandas as pd
except ImportError:
print("pandas가 필요합니다. `pip install pandas` 로 설치하세요.")
sys.exit(1)
# --- 여기에 본인 API 키 입력 또는 환경변수 사용 ---
CLIENT_ID = os.getenv("NAVER_CLIENT_ID", "YOUR_CLIENT_ID")
CLIENT_SECRET = os.getenv("NAVER_CLIENT_SECRET", "YOUR_CLIENT_SECRET")
API_URL = "https://openapi.naver.com/v1/search/news.json" # JSON 결과
MAX_TOTAL = 1000 # 네이버 검색 최대 1000개
MAX_PER_REQUEST = 100 # display 최대 100
RATE_SLEEP = 0.1 # 호출 간 살짝 쉬기(레이트 제한 여유)
def clean_text(s: str) -> str:
"""네이버 응답의 <b> 태그 제거 및 HTML 엔티티 디코딩"""
if s is None:
return ""
s = html.unescape(s)
s = re.sub(r"<\/?b>", "", s)
s = re.sub(r"<[^>]+>", "", s) # 혹시 남은 태그 제거
return s.strip()
def slugify_ascii(text: str) -> str:
"""영문/숫자/밑줄만 남겨 파일명으로 안전하게 변환"""
text = text.lower()
text = re.sub(r"[^a-z0-9]+", "_", text)
text = re.sub(r"_+", "_", text).strip("_")
return text or "query"
def fetch_naver_news(query: str, total_count: int):
"""쿼리에 대해 total_count개까지 뉴스 수집하여 list[dict] 반환"""
headers = {
"X-Naver-Client-Id": CLIENT_ID,
"X-Naver-Client-Secret": CLIENT_SECRET,
}
total_count = max(1, min(int(total_count), MAX_TOTAL))
pages = math.ceil(total_count / MAX_PER_REQUEST)
results = []
fetched = 0
for i in range(pages):
remaining = total_count - fetched
display = min(MAX_PER_REQUEST, remaining)
start = fetched + 1 # 네이버 start는 1부터
params = {
"query": query,
"display": display,
"start": start,
# 정렬 옵션: sim(정확도), date(날짜). 필요시 변경 가능
"sort": "date",
}
url = f"{API_URL}?{urllib.parse.urlencode(params)}"
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req) as resp:
if resp.getcode() != 200:
print(f"HTTP {resp.getcode()} 오류 발생")
break
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
print(f"요청 중 오류: {e}")
break
items = data.get("items", [])
if not items:
# 더 이상 결과가 없으면 중단
break
for item in items:
results.append({
"title": clean_text(item.get("title")),
"originallink": item.get("originallink", ""),
"link": item.get("link", ""),
"description": clean_text(item.get("description")),
"pubDate": item.get("pubDate", ""),
})
fetched += 1
if fetched >= total_count:
break
if fetched >= total_count:
break
time.sleep(RATE_SLEEP)
return results
def main():
# --- 사용자 입력 ---
query = input("검색 키워드를 입력하세요: ").strip()
if not query:
print("검색 키워드는 비어 있을 수 없습니다.")
sys.exit(1)
try:
total = int(input("검색량(최대 1000)을 입력하세요: ").strip())
except ValueError:
print("검색량은 정수여야 합니다.")
sys.exit(1)
# --- API 키 검증(기본값 그대로면 경고) ---
if CLIENT_ID == "YOUR_CLIENT_ID" or CLIENT_SECRET == "YOUR_CLIENT_SECRET":
print("경고: CLIENT_ID/CLIENT_SECRET를 설정하세요. 환경변수 NAVER_CLIENT_ID/NAVER_CLIENT_SECRET 또는 코드 상수로 설정 가능합니다.")
print(f"▶ '{query}' 뉴스 {min(total, MAX_TOTAL)}개까지 수집 중...")
items = fetch_naver_news(query, total)
if not items:
print("결과가 없습니다.")
sys.exit(0)
# --- DataFrame 변환 ---
df = pd.DataFrame(items, columns=["title", "originallink", "link", "description", "pubDate"])
print(f"수집 완료: {len(df)}건")
# --- CSV 저장(영문 파일명) ---
slug = slugify_ascii(query)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"naver_news_{slug}_{ts}.csv"
df.to_csv(filename, index=False, encoding="utf-8-sig")
print(f"CSV 저장 완료: {filename}")
if __name__ == "__main__":
main()