이 학습자료는 고급 웹 스크래핑 기술을 활용한 세 가지 실전 프로젝트를 통해 데이터 수집 및 분석 역량을 키우는 데 도움을 줍니다. 각 프로젝트는 단계별로 구성되어 있으며, 실제 활용 가능한 코드와 함께 설명합니다.
목차
1. 뉴스 포털 데이터 수집기
프로젝트 개요
- 목표: 주요 뉴스 포털에서 특정 키워드 관련 뉴스 기사를 수집하고 분석
- 기술 스택: Python, Selenium, BeautifulSoup, pandas, SQLite
- 난이도: 중급
- 소요 시간: 약 3-4시간
학습 목표
- 동적 웹페이지에서 데이터 수집하기
- 페이지네이션 처리하기
- 수집한 데이터를 구조화하여 저장하기
- 간단한 텍스트 분석 수행하기
구현 단계
1.1 프로젝트 설정
먼저 필요한 라이브러리를 설치합니다.
pip install selenium beautifulsoup4 pandas webdriver-manager nltk matplotlib wordcloud
프로젝트 구조를 설정합니다.
news_scraper/
├── news_scraper.py # 메인 스크래핑 스크립트
├── data_processor.py # 데이터 처리 및 분석
├── utils.py # 유틸리티 함수
└── data/ # 수집된 데이터 저장 폴더
1.2 뉴스 스크래퍼 구현
news_scraper.py 파일에 다음 코드를 작성합니다.
import os
import time
import pandas as pd
import sqlite3
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
from bs4 import BeautifulSoup
import logging
import re
import json
import random
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("news_scraper.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class NewsPortalScraper:
def __init__(self, keyword, max_pages=5, headless=True):
self.keyword = keyword
self.max_pages = max_pages
self.headless = headless
self.driver = None
self.articles = []
self.data_dir = "data"
# 데이터 폴더 생성
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
# 데이터베이스 연결
self.db_path = os.path.join(self.data_dir, "news_articles.db")
self.conn = sqlite3.connect(self.db_path)
self.create_tables()
def create_tables(self):
"""데이터베이스 테이블 생성"""
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS articles (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT,
content TEXT,
url TEXT UNIQUE,
source TEXT,
author TEXT,
published_date TEXT,
scraped_date TEXT,
keyword TEXT
)
''')
self.conn.commit()
def init_driver(self):
"""Selenium 웹드라이버 초기화"""
chrome_options = Options()
if self.headless:
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
try:
self.driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
logger.info("웹드라이버가 성공적으로 초기화되었습니다.")
except Exception as e:
logger.error(f"데이터베이스에서 데이터 로드 중 오류: {str(e)}")
return pd.DataFrame()
def preprocess_text(self, text):
"""텍스트 전처리"""
if not isinstance(text, str):
return ""
# 특수문자 제거
text = re.sub(r'[^\w\s]', ' ', text)
# 숫자 제거
text = re.sub(r'\d+', ' ', text)
# 여러 공백을 하나로 치환
text = re.sub(r'\s+', ' ', text).strip()
return text
def tokenize_text(self, text):
"""텍스트 토큰화"""
if not text:
return []
# 텍스트 전처리
text = self.preprocess_text(text)
# 토큰화
tokens = word_tokenize(text)
# 불용어 제거 & 길이가 1인 토큰 제거
tokens = [token.lower() for token in tokens if token.lower() not in self.stopwords and len(token) > 1]
return tokens
def get_keyword_frequency(self, df, content_column='content', top_n=30):
"""기사 내용에서 키워드 빈도 분석"""
all_tokens = []
for content in df[content_column]:
tokens = self.tokenize_text(content)
all_tokens.extend(tokens)
# 빈도 계산
word_freq = Counter(all_tokens)
# 상위 키워드 추출
top_keywords = word_freq.most_common(top_n)
return top_keywords
def generate_wordcloud(self, word_freq, title='키워드 워드클라우드'):
"""워드클라우드 생성"""
# 워드클라우드용 딕셔너리 생성
word_dict = dict(word_freq)
# 워드클라우드 생성
wordcloud = WordCloud(
font_path='malgun.ttf', # 한글 폰트 경로 (한글 표시를 위해 필요)
width=800,
height=400,
background_color='white',
max_words=100,
relative_scaling=0.5
).generate_from_frequencies(word_dict)
# 시각화
plt.figure(figsize=(10, 6))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title(title)
# 이미지 저장
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
filename = os.path.join(self.data_dir, f'wordcloud_{timestamp}.png')
plt.savefig(filename, dpi=300, bbox_inches='tight')
logger.info(f"워드클라우드 이미지 저장: {filename}")
return filename
def analyze_news_by_source(self, df):
"""언론사별 기사 분석"""
# 언론사별 기사 수
source_counts = df['source'].value_counts()
# 시각화
plt.figure(figsize=(12, 6))
source_counts.head(10).plot(kind='bar')
plt.title('언론사별 기사 수')
plt.xlabel('언론사')
plt.ylabel('기사 수')
plt.xticks(rotation=45)
plt.tight_layout()
# 이미지 저장
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
filename = os.path.join(self.data_dir, f'news_by_source_{timestamp}.png')
plt.savefig(filename, dpi=300, bbox_inches='tight')
logger.info(f"언론사별 기사 분석 이미지 저장: {filename}")
return filename
def analyze_news_over_time(self, df):
"""시간에 따른 기사 수 분석"""
# 날짜 형식이 다양할 수 있으므로 먼저 발행일자를 파싱 시도
try:
# 발행일자 컬럼 형식에 따른 처리
df['date'] = pd.to_datetime(df['published_date'], errors='coerce')
# 결측치 제거
df_date = df.dropna(subset=['date'])
if len(df_date) > 0:
# 날짜별로 그룹화
date_counts = df_date.groupby(df_date['date'].dt.date).size()
# 시각화
plt.figure(figsize=(12, 6))
date_counts.plot(kind='line', marker='o')
plt.title('날짜별 기사 수')
plt.xlabel('날짜')
plt.ylabel('기사 수')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# 이미지 저장
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
filename = os.path.join(self.data_dir, f'news_over_time_{timestamp}.png')
plt.savefig(filename, dpi=300, bbox_inches='tight')
logger.info(f"시간에 따른 기사 분석 이미지 저장: {filename}")
return filename
else:
logger.warning("유효한 날짜 데이터가 없습니다.")
return None
except Exception as e:
logger.error(f"시간에 따른 기사 분석 중 오류: {str(e)}")
return None
def generate_report(self, keyword=None):
"""분석 보고서 생성"""
# 데이터 로드
df = self.load_data_from_db(keyword)
if df.empty:
logger.warning("분석할 데이터가 없습니다.")
return None
# 분석 결과를 저장할 딕셔너리
report = {
'keyword': keyword if keyword else '전체',
'total_articles': len(df),
'sources': df['source'].value_counts().to_dict(),
'date_range': {
'start': df['scraped_date'].min(),
'end': df['scraped_date'].max()
}
}
# 키워드 빈도 분석
top_keywords = self.get_keyword_frequency(df)
report['top_keywords'] = dict(top_keywords)
# 워드클라우드 생성
wordcloud_path = self.generate_wordcloud(top_keywords, f"'{keyword}' 관련 키워드" if keyword else "전체 뉴스 키워드")
report['wordcloud_path'] = wordcloud_path
# 언론사별 기사 분석
source_analysis_path = self.analyze_news_by_source(df)
report['source_analysis_path'] = source_analysis_path
# 시간에 따른 기사 분석
time_analysis_path = self.analyze_news_over_time(df)
report['time_analysis_path'] = time_analysis_path
# 보고서 저장
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S')
report_file = os.path.join(self.data_dir, f'news_report_{keyword if keyword else "all"}_{timestamp}.json')
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
logger.info(f"분석 보고서 저장: {report_file}")
return report_file
def close(self):
"""리소스 정리"""
if self.conn:
self.conn.close()
# 메인 실행 코드
if __name__ == "__main__":
try:
processor = NewsDataProcessor()
# 검색할 키워드 입력 (선택 사항)
keyword = input("분석할 키워드를 입력하세요 (모든 기사를 분석하려면 Enter): ").strip() or None
# 분석 보고서 생성
report_file = processor.generate_report(keyword)
if report_file:
print(f"분석 보고서가 생성되었습니다: {report_file}")
else:
print("분석 보고서 생성에 실패했습니다.")
except Exception as e:
print(f"오류 발생: {str(e)}")
finally:
if 'processor' in locals():
processor.close()
1.4 유틸리티 함수
utils.py 파일에 다음 코드를 작성합니다.
import time
import random
import logging
from selenium.common.exceptions import TimeoutException, NoSuchElementException, StaleElementReferenceException
logger = logging.getLogger(__name__)
def retry_with_backoff(func, max_tries=5, backoff_factor=0.5):
"""
지수 백오프를 사용한 재시도 데코레이터 함수
Args:
func: 재시도할 함수
max_tries: 최대 재시도 횟수
backoff_factor: 백오프 계수
Returns:
함수의 결과
"""
def wrapper(*args, **kwargs):
tries = 0
while tries < max_tries:
try:
return func(*args, **kwargs)
except (TimeoutException, NoSuchElementException, StaleElementReferenceException) as e:
tries += 1
if tries == max_tries:
logger.error(f"최대 재시도 횟수 초과: {str(e)}")
raise
sleep_time = backoff_factor * (2 ** (tries - 1)) + random.uniform(0, 1)
logger.warning(f"오류 발생, {sleep_time:.2f}초 후 재시도 ({tries}/{max_tries}): {str(e)}")
time.sleep(sleep_time)
return wrapper
def safe_get_text(element):
"""
안전하게 요소의 텍스트 가져오기
Args:
element: 웹 요소
Returns:
요소의 텍스트 또는 빈 문자열
"""
if element is None:
return ""
try:
return element.text.strip()
except (StaleElementReferenceException, AttributeError):
return ""
def safe_get_attribute(element, attribute):
"""
안전하게 요소의 속성 가져오기
Args:
element: 웹 요소
attribute: 속성 이름
Returns:
요소의 속성 값 또는 빈 문자열
"""
if element is None:
return ""
try:
return element.get_attribute(attribute) or ""
except (StaleElementReferenceException, AttributeError):
return ""
def format_filename(filename):
"""
파일명에 사용할 수 없는 문자 처리
Args:
filename: 원본 파일명
Returns:
유효한 파일명
"""
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
filename = filename.replace(char, '_')
return filename
학습 포인트
- 동적 웹 페이지 처리:
- 네이버와 다음 뉴스는 검색 결과가 동적으로 로드됩니다.
- Selenium을 사용하여 페이지를 완전히 렌더링한 후 데이터를 수집합니다.
- 페이지네이션 처리:
- 여러 페이지에 걸친 검색 결과를 순회하며 데이터를 수집합니다.
- 다음 페이지 버튼을 찾아 클릭하는 방식으로 페이지 전환을 자동화합니다.
- 에러 처리 및 안정성:
- 재시도 메커니즘과 예외 처리를 통해 스크래핑의 안정성을 높입니다.
- 중복 데이터 방지를 위한 URL 기반 필터링을 구현합니다.
- 데이터 저장 및 관리:
- SQLite 데이터베이스를 사용해 구조화된 데이터 저장
- CSV 및 JSON 형식으로 데이터 내보내기 기능 제공
- 데이터 분석 및 시각화:
- 키워드 빈도 분석 및 워드클라우드 생성
- 언론사별, 시간별 기사 분포 분석
2. SNS 댓글 분석기
프로젝트 개요
- 목표: 유튜브 영상의 댓글을 수집하고 감성 분석 수행
- 기술 스택: Python, Selenium, Google API, NLTK, TextBlob
- 난이도: 중급
- 소요 시간: 약 4-5시간
학습 목표
- API와 웹 스크래핑을 조합하여 데이터 수집하기
- 댓글 데이터에 감성 분석 적용하기
- 결과를 시각적으로 표현하기
구현 단계
2.1 프로젝트 설정
필요한 라이브러리를 설치합니다.
pip install selenium google-api-python-client google-auth-httplib2 google-auth-oauthlib pandas nltk textblob matplotlib seaborn wordcloud
프로젝트 구조를 설정합니다.
youtube_comment_analyzer/
├── comment_scraper.py # 유튜브 댓글 수집
├── sentiment_analyzer.py # 감성 분석 수행
├── visualizer.py # 데이터 시각화
├── main.py # 메인 실행 파일
├── config.py # 설정 파일
└── data/ # 수집된 데이터 저장 폴더
2.2 댓글 수집기 구현
comment_scraper.py 파일에 다음 코드를 작성합니다.
import os
import re
import time
import pandas as pd
from datetime import datetime
import googleapiclient.discovery
from googleapiclient.errors import HttpError
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
from webdriver_manager.chrome import ChromeDriverManager
import logging
import json
import sqlite3
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("youtube_scraper.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class YoutubeCommentScraper:
def __init__(self, api_key=None, use_api=True, data_dir="data"):
self.api_key = api_key
self.use_api = use_api and api_key is not None
self.driver = None
self.youtube = None
self.data_dir = data_dir
# 데이터 폴더 생성
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
# 데이터베이스 연결
self.db_path = os.path.join(self.data_dir, "youtube_comments.db")
self.conn = sqlite3.connect(self.db_path)
self.create_tables()
# API 또는 Selenium 초기화
if self.use_api:
self.init_youtube_api()
else:
self.init_driver()
def create_tables(self):
"""데이터베이스 테이블 생성"""
cursor = self.conn.cursor()
# 비디오 정보 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS videos (
video_id TEXT PRIMARY KEY,
title TEXT,
channel TEXT,
publish_date TEXT,
view_count INTEGER,
like_count INTEGER,
comment_count INTEGER,
scraped_date TEXT
)
''')
# 댓글 테이블
cursor.execute('''
CREATE TABLE IF NOT EXISTS comments (
id INTEGER PRIMARY KEY AUTOINCREMENT,
video_id TEXT,
comment_id TEXT UNIQUE,
author TEXT,
text TEXT,
published_at TEXT,
like_count INTEGER,
reply_count INTEGER,
scraped_date TEXT,
FOREIGN KEY (video_id) REFERENCES videos (video_id)
)
''')
self.conn.commit()
def init_youtube_api(self):
"""YouTube API 클라이언트 초기화"""
try:
api_service_name = "youtube"
api_version = "v3"
self.youtube = googleapiclient.discovery.build(
api_service_name, api_version, developerKey=self.api_key)
logger.info("YouTube API 클라이언트가 성공적으로 초기화되었습니다.")
except Exception as e:
logger.error(f"YouTube API 초기화 중 오류: {str(e)}")
self.use_api = False
self.init_driver() # API 초기화 실패 시 Selenium으로 대체
def init_driver(self):
"""Selenium 웹드라이버 초기화"""
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
try:
self.driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
logger.info("Selenium 웹드라이버가 성공적으로 초기화되었습니다.")
except Exception as e:
logger.error(f"Selenium 웹드라이버 초기화 중 오류: {str(e)}")
raise
def close_driver(self):
"""드라이버 종료"""
if self.driver:
self.driver.quit()
self.driver = None
def extract_video_id(self, url):
"""URL에서 YouTube 비디오 ID 추출"""
# YouTube URL 패턴
# https://www.youtube.com/watch?v=VIDEO_ID
# https://youtu.be/VIDEO_ID
# https://www.youtube.com/embed/VIDEO_ID
video_id = None
if "youtube.com/watch" in url:
video_id = re.search(r'v=([^&]+)', url)
elif "youtu.be/" in url:
video_id = re.search(r'youtu\.be/([^?]+)', url)
elif "youtube.com/embed/" in url:
video_id = re.search(r'embed/([^?]+)', url)
return video_id.group(1) if video_id else None
def get_video_info_api(self, video_id):
"""YouTube API를 사용하여 비디오 정보 가져오기"""
try:
request = self.youtube.videos().list(
part="snippet,statistics",
id=video_id
)
response = request.execute()
if not response['items']:
logger.warning(f"비디오 ID {video_id}에 대한 정보를 찾을 수 없습니다.")
return None
video_data = response['items'][0]
snippet = video_data['snippet']
statistics = video_data['statistics']
video_info = {
'video_id': video_id,
'title': snippet['title'],
'channel': snippet['channelTitle'],
'publish_date': snippet['publishedAt'],
'view_count': int(statistics.get('viewCount', 0)),
'like_count': int(statistics.get('likeCount', 0)),
'comment_count': int(statistics.get('commentCount', 0)),
'scraped_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return video_info
except HttpError as e:
logger.error(f"API 요청 중 HttpError 발생: {str(e)}")
return None
except Exception as e:
logger.error(f"비디오 정보 조회 중 오류: {str(e)}")
return None
def get_video_info_selenium(self, video_id):
"""Selenium을 사용하여 비디오 정보 가져오기"""
url = f"https://www.youtube.com/watch?v={video_id}"
try:
self.driver.get(url)
time.sleep(3) # 페이지 로딩 대기
# 제목 가져오기
title_element = WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "h1.title"))
)
title = title_element.text
# 채널명 가져오기
channel_element = self.driver.find_element(By.CSS_SELECTOR, "div#owner #channel-name")
channel = channel_element.text
# 조회수 가져오기
view_count_text = self.driver.find_element(By.CSS_SELECTOR, "span.view-count").text
view_count = int(re.sub(r'[^0-9]', '', view_count_text))
# 좋아요 수 가져오기
like_count_text = self.driver.find_element(By.CSS_SELECTOR, "div#top-level-buttons-computed button:first-child").get_attribute("aria-label")
like_count = int(re.sub(r'[^0-9]', '', like_count_text)) if like_count_text else 0
# 댓글 수 가져오기 (있을 경우)
try:
comment_count_text = self.driver.find_element(By.CSS_SELECTOR, "h2#count").text
comment_count = int(re.sub(r'[^0-9]', '', comment_count_text))
except NoSuchElementException:
comment_count = 0
video_info = {
'video_id': video_id,
'title': title,
'channel': channel,
'publish_date': "알 수 없음", # Selenium으로는 정확한 날짜를 가져오기 어려움
'view_count': view_count,
'like_count': like_count,
'comment_count': comment_count,
'scraped_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
return video_info
except Exception as e:
logger.error(f"Selenium으로 비디오 정보 조회 중 오류: {str(e)}")
return None
def get_comments_api(self, video_id, max_comments=100):
"""YouTube API를 사용하여 댓글 가져오기"""
comments = []
next_page_token = None
try:
while len(comments) < max_comments:
request = self.youtube.commentThreads().list(
part="snippet,replies",
videoId=video_id,
maxResults=min(100, max_comments - len(comments)),
pageToken=next_page_token
)
response = request.execute()
for item in response['items']:
comment = item['snippet']['topLevelComment']['snippet']
comment_data = {
'video_id': video_id,
'comment_id': item['id'],
'author': comment['authorDisplayName'],
'text': comment['textDisplay'],
'published_at': comment['publishedAt'],
'like_count': comment['likeCount'],
'reply_count': item['snippet']['totalReplyCount'],
'scraped_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
comments.append(comment_data)
# 답글 가져오기 (있는 경우)
if 'replies' in item and item['snippet']['totalReplyCount'] > 0:
for reply in item['replies']['comments']:
reply_snippet = reply['snippet']
reply_data = {
'video_id': video_id,
'comment_id': reply['id'],
'author': reply_snippet['authorDisplayName'],
'text': reply_snippet['textDisplay'],
'published_at': reply_snippet['publishedAt'],
'like_count': reply_snippet['likeCount'],
'reply_count': 0, # 답글은 추가 답글이 없음
'scraped_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
comments.append(reply_data)
if len(comments) >= max_comments:
break
if len(comments) >= max_comments:
break
# 다음 페이지 토큰 확인
next_page_token = response.get('nextPageToken')
if not next_page_token:
break
logger.info(f"API를 통해 {len(comments)}개의 댓글을 가져왔습니다.")
return comments
except HttpError as e:
logger.error(f"API 요청 중 HttpError 발생: {str(e)}")
return []
except Exception as e:
logger.error(f"댓글 가져오기 중 오류: {str(e)}")
return []
def get_comments_selenium(self, video_id, max_comments=100):
"""Selenium을 사용하여 댓글 가져오기"""
url = f"https://www.youtube.com/watch?v={video_id}"
comments = []
try:
self.driver.get(url)
time.sleep(3) # 페이지 로딩 대기
# 댓글 섹션으로 스크롤
self.driver.execute_script("window.scrollBy(0, 500);")
time.sleep(2)
# 댓글이 로드될 때까지 대기
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "ytd-comments#comments"))
)
except TimeoutException:
logger.warning("댓글을 찾을 수 없습니다. 댓글이 비활성화되었을 수 있습니다.")
return []
# 댓글 로드를 위해 스크롤 다운
last_height = self.driver.execute_script("return document.documentElement.scrollHeight")
comment_count = 0
while comment_count < max_comments:
# 페이지 끝까지 스크롤
self.driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
time.sleep(2)
# 새 스크롤 높이 계산
new_height = self.driver.execute_script("return document.documentElement.scrollHeight")
# 댓글 요소 가져오기
comment_elements = self.driver.find_elements(By.CSS_SELECTOR, "ytd-comment-thread-renderer")
comment_count = len(comment_elements)
logger.info(f"현재 {comment_count}개의 댓글을 로드했습니다.")
# 더 이상 스크롤이 f"웹드라이버 초기화 중 오류 발생: {str(e)}")
raise
def close_driver(self):
"""드라이버 종료"""
if self.driver:
self.driver.quit()
self.driver = None
def scrape_naver_news(self):
"""네이버 뉴스 스크래핑"""
if not self.driver:
self.init_driver()
base_url = f"https://search.naver.com/search.naver?where=news&query={self.keyword}&sm=tab_opt&sort=1"
self.driver.get(base_url)
logger.info(f"네이버 뉴스 검색 시작: 키워드 '{self.keyword}'")
current_page = 1
while current_page <= self.max_pages:
logger.info(f"페이지 {current_page} 스크래핑 중...")
try:
# 뉴스 목록이 로드될 때까지 대기
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "ul.list_news"))
)
# 현재 페이지의 뉴스 목록 가져오기
news_list = self.driver.find_elements(By.CSS_SELECTOR, "ul.list_news > li")
logger.info(f"현재 페이지에서 {len(news_list)}개의 뉴스 항목 발견")
for news_item in news_list:
try:
# 뉴스 제목과 링크 추출
title_element = news_item.find_element(By.CSS_SELECTOR, "a.news_tit")
title = title_element.text
url = title_element.get_attribute("href")
# 언론사 추출
try:
source = news_item.find_element(By.CSS_SELECTOR, "a.info.press").text
except NoSuchElementException:
source = "알 수 없음"
# 기사 요약 추출
try:
summary = news_item.find_element(By.CSS_SELECTOR, "a.api_txt_lines.dsc_txt_wrap").text
except NoSuchElementException:
summary = ""
# 날짜 정보 추출
try:
date_info = news_item.find_element(By.CSS_SELECTOR, "span.info").text
# 정규표현식으로 날짜 형식 추출 (예: '1시간 전', '2023.10.15.')
date_match = re.search(r'\d{4}\.\d{2}\.\d{2}\.|\d+분 전|\d+시간 전|어제', date_info)
published_date = date_match.group(0) if date_match else "알 수 없음"
except NoSuchElementException:
published_date = "알 수 없음"
# 기사 정보 저장
article = {
"title": title,
"content": summary, # 기본 요약 정보
"url": url,
"source": source,
"author": "알 수 없음", # 목록에서는 저자 정보 없음
"published_date": published_date,
"scraped_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"keyword": self.keyword
}
# 중복 검사 후 기사 추가
if not self.is_duplicate(url):
self.articles.append(article)
self.save_article(article)
except Exception as e:
logger.warning(f"뉴스 항목 처리 중 오류: {str(e)}")
continue
# 다음 페이지 버튼 찾기 및 클릭
if current_page < self.max_pages:
try:
# 페이지 번호 버튼 또는 다음 페이지 버튼 클릭
next_page_button = None
# 현재 페이지 그룹에서 다음 페이지 찾기
page_buttons = self.driver.find_elements(By.CSS_SELECTOR, "div.sc_page_inner a.btn")
for button in page_buttons:
if button.text.isdigit() and int(button.text) == current_page + 1:
next_page_button = button
break
# 다음 페이지 그룹으로 이동하는 버튼 찾기
if not next_page_button:
next_group_button = self.driver.find_element(By.CSS_SELECTOR, "a.btn_next")
if next_group_button:
next_group_button.click()
time.sleep(2)
# 그룹 이동 후 첫 번째 페이지 선택
page_buttons = self.driver.find_elements(By.CSS_SELECTOR, "div.sc_page_inner a.btn")
if page_buttons:
next_page_button = page_buttons[0]
if next_page_button:
next_page_button.click()
current_page += 1
time.sleep(random.uniform(1.5, 3)) # 서버 부하 방지를 위한 대기
else:
logger.info("더 이상 페이지가 없습니다.")
break
except Exception as e:
logger.warning(f"다음 페이지 이동 중 오류: {str(e)}")
break
else:
break
except Exception as e:
logger.error(f"페이지 {current_page} 처리 중 오류: {str(e)}")
break
logger.info(f"네이버 뉴스 스크래핑 완료. 총 {len(self.articles)}개의 기사를 수집했습니다.")
def scrape_daum_news(self):
"""다음 뉴스 스크래핑"""
if not self.driver:
self.init_driver()
base_url = f"https://search.daum.net/search?w=news&q={self.keyword}&sort=recency"
self.driver.get(base_url)
logger.info(f"다음 뉴스 검색 시작: 키워드 '{self.keyword}'")
current_page = 1
while current_page <= self.max_pages:
logger.info(f"페이지 {current_page} 스크래핑 중...")
try:
# 뉴스 목록이 로드될 때까지 대기
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "ul.list_news"))
)
# 현재 페이지의 뉴스 목록 가져오기
news_list = self.driver.find_elements(By.CSS_SELECTOR, "ul.list_news > li.type_news")
logger.info(f"현재 페이지에서 {len(news_list)}개의 뉴스 항목 발견")
for news_item in news_list:
try:
# 뉴스 제목과 링크 추출
title_element = news_item.find_element(By.CSS_SELECTOR, "a.tit_main")
title = title_element.text
url = title_element.get_attribute("href")
# 언론사 추출
try:
source = news_item.find_element(By.CSS_SELECTOR, "span.txt_info").text
except NoSuchElementException:
source = "알 수 없음"
# 기사 요약 추출
try:
summary = news_item.find_element(By.CSS_SELECTOR, "p.desc").text
except NoSuchElementException:
summary = ""
# 날짜 정보 추출
try:
date_info = news_item.find_element(By.CSS_SELECTOR, "span.txt_info:nth-child(3)").text
published_date = date_info
except NoSuchElementException:
published_date = "알 수 없음"
# 기사 정보 저장
article = {
"title": title,
"content": summary,
"url": url,
"source": source,
"author": "알 수 없음",
"published_date": published_date,
"scraped_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"keyword": self.keyword
}
# 중복 검사 후 기사 추가
if not self.is_duplicate(url):
self.articles.append(article)
self.save_article(article)
except Exception as e:
logger.warning(f"뉴스 항목 처리 중 오류: {str(e)}")
continue
# 다음 페이지 버튼 찾기 및 클릭
if current_page < self.max_pages:
try:
# 다음 페이지 버튼
pagination = self.driver.find_element(By.CSS_SELECTOR, "div.paging_comm")
next_page_buttons = pagination.find_elements(By.TAG_NAME, "a")
next_clicked = False
for button in next_page_buttons:
if button.text.isdigit() and int(button.text) == current_page + 1:
button.click()
current_page += 1
next_clicked = True
time.sleep(random.uniform(1.5, 3))
break
if not next_clicked:
# 다음 버튼 찾기
next_button = self.driver.find_element(By.CSS_SELECTOR, "span.ico_comm.ico_next")
if next_button and next_button.is_displayed():
next_button.click()
current_page += 1
time.sleep(random.uniform(1.5, 3))
else:
logger.info("더 이상 페이지가 없습니다.")
break
except Exception as e:
logger.warning(f"다음 페이지 이동 중 오류: {str(e)}")
break
else:
break
except Exception as e:
logger.error(f"페이지 {current_page} 처리 중 오류: {str(e)}")
break
logger.info(f"다음 뉴스 스크래핑 완료. 총 {len(self.articles)}개의 기사를 수집했습니다.")
def scrape_full_articles(self, sample_size=10):
"""
수집한 기사의 전체 내용을 스크래핑
(모든 기사를 처리할 경우 시간이 오래 걸리므로 샘플 크기 지정)
"""
if not self.articles:
logger.warning("스크래핑된 기사가 없습니다.")
return
if not self.driver:
self.init_driver()
# 샘플 기사 선택
if sample_size > 0 and sample_size < len(self.articles):
sample_articles = random.sample(self.articles, sample_size)
else:
sample_articles = self.articles
logger.info(f"{len(sample_articles)}개 기사의 전체 내용을 수집합니다.")
for idx, article in enumerate(sample_articles):
try:
logger.info(f"기사 {idx+1}/{len(sample_articles)} 처리 중: {article['title'][:30]}...")
self.driver.get(article['url'])
time.sleep(random.uniform(2, 4)) # 페이지 로딩 대기
# 네이버 뉴스 본문 추출
if "naver.com" in article['url']:
try:
# 본문 내용이 로드될 때까지 대기
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.ID, "dic_area"))
)
# 본문 내용 추출
content_element = self.driver.find_element(By.ID, "dic_area")
content = content_element.text
# 작성자 정보 추출 (있는 경우)
try:
author = self.driver.find_element(By.CSS_SELECTOR, "span.byline_s").text
except NoSuchElementException:
author = article['author'] # 기존 값 유지
except Exception as e:
logger.warning(f"네이버 뉴스 본문 추출 중 오류: {str(e)}")
continue
# 다음 뉴스 본문 추출
elif "daum.net" in article['url']:
try:
# 본문 내용이 로드될 때까지 대기
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "div.article_view"))
)
# 본문 내용 추출
content_element = self.driver.find_element(By.CSS_SELECTOR, "div.article_view")
content = content_element.text
# 작성자 정보 추출 (있는 경우)
try:
author = self.driver.find_element(By.CSS_SELECTOR, "span.txt_info").text
except NoSuchElementException:
author = article['author'] # 기존 값 유지
except Exception as e:
logger.warning(f"다음 뉴스 본문 추출 중 오류: {str(e)}")
continue
# 그 외 뉴스 사이트
else:
logger.info(f"지원하지 않는 뉴스 사이트: {article['url']}")
continue
# 수집한 본문 및 작성자 정보 업데이트
article['content'] = content
article['author'] = author
# 데이터베이스 업데이트
self.update_article_content(article)
except Exception as e:
logger.error(f"기사 내용 수집 중 오류: {str(e)}")
continue
logger.info("기사 전체 내용 수집 완료")
def is_duplicate(self, url):
"""URL이 이미 데이터베이스에 있는지 확인"""
cursor = self.conn.cursor()
cursor.execute("SELECT id FROM articles WHERE url = ?", (url,))
return cursor.fetchone() is not None
def save_article(self, article):
"""기사 정보를 데이터베이스에 저장"""
cursor = self.conn.cursor()
try:
cursor.execute('''
INSERT INTO articles (title, content, url, source, author, published_date, scraped_date, keyword)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
article['title'],
article['content'],
article['url'],
article['source'],
article['author'],
article['published_date'],
article['scraped_date'],
article['keyword']
))
self.conn.commit()
except sqlite3.IntegrityError:
logger.warning(f"중복된 URL: {article['url']}")
except Exception as e:
logger.error(f"기사 저장 중 오류: {str(e)}")
def update_article_content(self, article):
"""기사 본문 및 작성자 정보 업데이트"""
cursor = self.conn.cursor()
try:
cursor.execute('''
UPDATE articles
SET content = ?, author = ?
WHERE url = ?
''', (article['content'], article['author'], article['url']))
self.conn.commit()
except Exception as e:
logger.error(f"기사 업데이트 중 오류: {str(e)}")
def export_to_csv(self):
"""수집한 기사를 CSV 파일로 내보내기"""
if not self.articles:
logger.warning("내보낼 기사가 없습니다.")
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_filename = os.path.join(self.data_dir, f"news_{self.keyword}_{timestamp}.csv")
try:
df = pd.DataFrame(self.articles)
df.to_csv(csv_filename, index=False, encoding='utf-8-sig')
logger.info(f"CSV 파일로 내보내기 완료: {csv_filename}")
return csv_filename
except Exception as e:
logger.error(f"CSV 내보내기 중 오류: {str(e)}")
return None
def export_to_json(self):
"""수집한 기사를 JSON 파일로 내보내기"""
if not self.articles:
logger.warning("내보낼 기사가 없습니다.")
return None
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_filename = os.path.join(self.data_dir, f"news_{self.keyword}_{timestamp}.json")
try:
with open(json_filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
logger.info(f"JSON 파일로 내보내기 완료: {json_filename}")
return json_filename
except Exception as e:
logger.error(f"JSON 내보내기 중 오류: {str(e)}")
return None
def close(self):
"""리소스 정리"""
self.close_driver()
if self.conn:
self.conn.close()
# 메인 실행 코드
if __name__ == "__main__":
try:
keyword = input("검색할 키워드를 입력하세요: ")
max_pages = int(input("수집할 최대 페이지 수를 입력하세요 (기본값: 5): ") or "5")
scraper = NewsPortalScraper(keyword, max_pages)
# 네이버 뉴스 스크래핑
scraper.scrape_naver_news()
# 다음 뉴스 스크래핑
scraper.scrape_daum_news()
# 전체 기사 내용 스크래핑 (샘플)
sample_size = int(input("전체 내용을 수집할 기사 수를 입력하세요 (기본값: 5): ") or "5")
scraper.scrape_full_articles(sample_size)
# 결과 내보내기
csv_path = scraper.export_to_csv()
json_path = scraper.export_to_json()
print(f"스크래핑 완료! 총 {len(scraper.articles)}개의 기사를 수집했습니다.")
if csv_path:
print(f"CSV 파일: {csv_path}")
if json_path:
print(f"JSON 파일: {json_path}")
except KeyboardInterrupt:
print("\n사용자에 의해 프로그램이 중단되었습니다.")
except Exception as e:
print(f"오류 발생: {str(e)}")
finally:
if 'scraper' in locals():
scraper.close()
1.3 데이터 처리 및 분석
data_processor.py 파일에 다음 코드를 작성합니다.
import os
import pandas as pd
import sqlite3
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from collections import Counter
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import re
import logging
import json
# NLTK 필요 데이터 다운로드
nltk.download('punkt')
nltk.download('stopwords')
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class NewsDataProcessor:
def __init__(self, db_path="data/news_articles.db"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self.data_dir = os.path.dirname(db_path)
# 한국어 불용어 설정
self.stopwords = set(stopwords.words('english'))
# 한국어 불용어 추가
korean_stopwords = [
'있다', '하다', '이다', '되다', '않다', '그', '그녀', '그들', '이', '저', '것',
'이런', '저런', '어떤', '무슨', '위해', '통해', '위한', '때문', '그것', '이것',
'저것', '이번', '요', '를', '을', '에', '에서', '의', '으로', '로', '에게', '뉴스',
'기자', '데', '및', '또는', '또', '등', '함께', '오늘', '어제', '지금'
]
self.stopwords.update(korean_stopwords)
def load_data_from_db(self, keyword=None):
"""데이터베이스에서 기사 데이터 로드"""
query = "SELECT * FROM articles"
params = []
if keyword:
query += " WHERE keyword = ?"
params.append(keyword)
try:
df = pd.read_sql_query(query, self.conn, params=params)
logger.info(f"데이터베이스에서 {len(df)}개의 기사를 로드했습니다.")
return df
except Exception as e:
logger.error(
'오픈소스를 위한 기초 상식' 카테고리의 다른 글
SQLite 기반 자동화 시스템 구축 가이드 (0) | 2025.03.25 |
---|---|
SQLite 학습 가이드 (0) | 2025.03.24 |
고급 웹 스크래핑 가이드 (0) | 2025.03.22 |
XRDP로 원격 세팅 (0) | 2025.03.20 |
당분간 파이썬? Pandas 관련 정리하기 (0) | 2025.02.18 |