Selenium 기초
1.1 웹드라이버 설정
Selenium을 사용하기 위해서는 먼저 웹드라이버를 설정해야 합니다. 최근 Selenium 4부터는 WebDriver Manager를 통해 자동으로 브라우저 드라이버를 설치할 수 있습니다.
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options
# Chrome 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless") # 브라우저를 표시하지 않고 실행
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
# 웹드라이버 설정
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
# 웹사이트 접속
driver.get("https://example.com")
print(driver.title)
# 세션 종료
driver.quit()
1.2 동적 페이지 크롤링
JavaScript로 동적으로 콘텐츠를 로드하는, SPA(Single Page Application) 웹사이트에서는 단순한 HTTP 요청으로는 모든 데이터를 가져올 수 없습니다. Selenium은 실제 브라우저처럼 동작하여 동적 콘텐츠를 렌더링한 후 데이터를 수집할 수 있습니다.
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.get("https://www.example.com/dynamic-content")
try:
# 페이지가 완전히 로드될 때까지 대기 (최대 10초)
WebDriverWait(driver, 10).until(
# ID가 'content-loaded'인 요소가 표시될 때까지 대기
EC.presence_of_element_located((By.ID, "content-loaded"))
)
# 무한 스크롤 페이지 처리
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
# 페이지 끝까지 스크롤
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 새 콘텐츠가 로드될 때까지 대기
time.sleep(2)
# 새 스크롤 높이 계산
new_height = driver.execute_script("return document.body.scrollHeight")
# 스크롤 높이가 같으면 더 이상 콘텐츠가 없는 것
if new_height == last_height:
break
last_height = new_height
# 동적으로 로드된 모든 항목 가져오기
items = driver.find_elements(By.CSS_SELECTOR, ".item")
for item in items:
title = item.find_element(By.CSS_SELECTOR, ".title").text
description = item.find_element(By.CSS_SELECTOR, ".description").text
print(f"Title: {title}, Description: {description}")
except Exception as e:
print(f"오류 발생: {e}")
finally:
driver.quit()
자동화된 데이터 수집
복잡한 상호작용(클릭, 폼 제출, 드롭다운 선택 등)이 필요한 경우 Selenium의 자동화 기능을 활용할 수 있습니다.
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import pandas as pd
import time
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.get("https://www.example.com/search-form")
try:
# 검색 폼 작성
search_input = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, "search-input"))
)
search_input.send_keys("데이터 사이언스")
# 드롭다운 메뉴에서 옵션 선택
category_dropdown = Select(driver.find_element(By.ID, "category"))
category_dropdown.select_by_visible_text("기술")
# 날짜 범위 설정
date_from = driver.find_element(By.ID, "date-from")
date_from.clear()
date_from.send_keys("2023-01-01")
date_to = driver.find_element(By.ID, "date-to")
date_to.clear()
date_to.send_keys("2023-12-31")
# 검색 버튼 클릭
driver.find_element(By.ID, "search-button").click()
# 결과 로딩 대기
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".search-results"))
)
# 결과 수집
results = []
result_elements = driver.find_elements(By.CSS_SELECTOR, ".result-item")
for item in result_elements:
result = {
"title": item.find_element(By.CSS_SELECTOR, ".result-title").text,
"date": item.find_element(By.CSS_SELECTOR, ".result-date").text,
"description": item.find_element(By.CSS_SELECTOR, ".result-description").text,
"url": item.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
}
results.append(result)
# 데이터프레임으로 변환
df = pd.DataFrame(results)
print(f"{len(results)}개의 결과를 찾았습니다.")
print(df.head())
# CSV로 저장
df.to_csv("search_results.csv", index=False, encoding="utf-8-sig")
except Exception as e:
print(f"오류 발생: {e}")
finally:
driver.quit()
대규모 데이터 수집
2.1 페이지네이션 처리
여러 페이지에 걸쳐 있는 데이터를 효율적으로 수집하는 방법입니다.
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
import pandas as pd
import time
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
url = "https://www.example.com/products?page=1"
driver.get(url)
all_products = []
current_page = 1
max_pages = 20 # 안전장치로 최대 페이지 수 설정
try:
while current_page <= max_pages:
print(f"현재 페이지: {current_page}")
# 상품 목록이 로드될 때까지 대기
WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".product-item"))
)
# 현재 페이지의 상품 정보 수집
products = driver.find_elements(By.CSS_SELECTOR, ".product-item")
for product in products:
try:
product_data = {
"name": product.find_element(By.CSS_SELECTOR, ".product-name").text,
"price": product.find_element(By.CSS_SELECTOR, ".product-price").text,
"rating": product.find_element(By.CSS_SELECTOR, ".product-rating").text,
"url": product.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
}
all_products.append(product_data)
except NoSuchElementException:
# 일부 요소가 없는 경우 건너뛰기
continue
# 다음 페이지 버튼 찾기
try:
next_button = driver.find_element(By.CSS_SELECTOR, ".pagination .next")
# 다음 페이지 버튼이 비활성화되어 있으면 종료
if "disabled" in next_button.get_attribute("class"):
print("마지막 페이지에 도달했습니다.")
break
# 다음 페이지로 이동
next_button.click()
current_page += 1
# 페이지 전환 대기
time.sleep(2)
except NoSuchElementException:
print("다음 페이지 버튼을 찾을 수 없습니다.")
break
except Exception as e:
print(f"오류 발생: {e}")
finally:
driver.quit()
# 수집한 데이터 처리
df = pd.DataFrame(all_products)
print(f"총 {len(all_products)}개의 상품 정보를 수집했습니다.")
df.to_csv("products.csv", index=False, encoding="utf-8-sig")
2.2 에러 핸들링
대규모 스크래핑에서는 다양한 예외 상황이 발생할 수 있습니다. 안정적인 스크래핑을 위한 에러 핸들링 방법입니다.
import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
StaleElementReferenceException,
WebDriverException
)
import pandas as pd
import time
import random
import logging
from requests.exceptions import RequestException
import sys
from urllib3.exceptions import MaxRetryError
import traceback
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("scraping.log"),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# 재시도 함수
def retry_with_backoff(func, max_tries=5, backoff_factor=0.5):
def wrapper(*args, **kwargs):
tries = 0
while tries < max_tries:
try:
return func(*args, **kwargs)
except (RequestException, WebDriverException, MaxRetryError, TimeoutException) as e:
tries += 1
if tries == max_tries:
logger.error(f"최대 재시도 횟수 초과: {str(e)}")
raise
sleep_time = backoff_factor * (2 ** (tries - 1)) + random.uniform(0, 1)
logger.warning(f"오류 발생, {sleep_time:.2f}초 후 재시도 ({tries}/{max_tries}): {str(e)}")
time.sleep(sleep_time)
return wrapper
# 웹드라이버 초기화 함수
@retry_with_backoff
def init_driver():
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
driver = webdriver.Chrome(
service=Service(ChromeDriverManager().install()),
options=chrome_options
)
return driver
# 페이지 로드 함수
@retry_with_backoff
def load_page(driver, url):
driver.get(url)
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# 요소 찾기 함수
def safe_find_element(driver, by, value, timeout=10):
try:
element = WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((by, value))
)
return element
except (TimeoutException, NoSuchElementException):
logger.warning(f"요소를 찾을 수 없음: {by}={value}")
return None
def safe_find_elements(driver, by, value, timeout=10):
try:
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((by, value))
)
elements = driver.find_elements(by, value)
return elements
except (TimeoutException, NoSuchElementException):
logger.warning(f"요소들을 찾을 수 없음: {by}={value}")
return []
# 텍스트 추출 함수
def safe_get_text(element):
if element is None:
return ""
try:
return element.text.strip()
except StaleElementReferenceException:
logger.warning("StaleElementReferenceException 발생")
return ""
# 속성 추출 함수
def safe_get_attribute(element, attribute):
if element is None:
return ""
try:
return element.get_attribute(attribute)
except (StaleElementReferenceException, WebDriverException):
logger.warning(f"속성 추출 중 오류 발생: {attribute}")
return ""
# 예시: 제품 스크래핑 함수
def scrape_products():
driver = None
results = []
try:
driver = init_driver()
base_url = "https://www.example.com/products?page="
page = 1
max_pages = 100
while page <= max_pages:
url = base_url + str(page)
try:
logger.info(f"페이지 로드 중: {url}")
load_page(driver, url)
# 스크래핑 간 간격 두기 (서버 부하 방지)
time.sleep(random.uniform(1, 3))
products = safe_find_elements(driver, By.CSS_SELECTOR, ".product-item")
if not products:
logger.info("더 이상 제품이 없습니다. 스크래핑 종료.")
break
logger.info(f"현재 페이지에서 {len(products)}개 제품 발견")
for product in products:
try:
name_element = safe_find_element(driver, By.CSS_SELECTOR, ".product-name", timeout=5)
price_element = safe_find_element(driver, By.CSS_SELECTOR, ".product-price", timeout=5)
url_element = safe_find_element(driver, By.CSS_SELECTOR, "a", timeout=5)
product_data = {
"name": safe_get_text(name_element),
"price": safe_get_text(price_element),
"url": safe_get_attribute(url_element, "href")
}
if product_data["name"]: # 이름이 있는 제품만 추가
results.append(product_data)
except Exception as e:
logger.error(f"제품 처리 중 오류: {str(e)}")
continue
# 다음 페이지 확인
next_button = safe_find_element(driver, By.CSS_SELECTOR, ".pagination .next")
if next_button and "disabled" not in safe_get_attribute(next_button, "class"):
next_button.click()
page += 1
else:
logger.info("마지막 페이지에 도달했습니다.")
break
except Exception as e:
logger.error(f"페이지 {page} 처리 중 오류: {str(e)}")
traceback.print_exc()
# 오류가 발생해도 다음 페이지로 진행
page += 1
continue
except Exception as e:
logger.error(f"스크래핑 프로세스 중 심각한 오류: {str(e)}")
traceback.print_exc()
finally:
if driver:
driver.quit()
# 결과 저장
if results:
try:
df = pd.DataFrame(results)
output_file = f"products_{time.strftime('%Y%m%d_%H%M%S')}.csv"
df.to_csv(output_file, index=False, encoding="utf-8-sig")
logger.info(f"총 {len(results)}개의 제품 데이터를 {output_file}에 저장했습니다.")
except Exception as e:
logger.error(f"결과 저장 중 오류: {str(e)}")
# 백업으로 JSON 형식으로 저장 시도
import json
with open("products_backup.json", "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
logger.info("백업 JSON 파일로 저장 완료")
return results
# 메인 실행
if __name__ == "__main__":
try:
scrape_products()
except KeyboardInterrupt:
logger.info("사용자에 의해 스크래핑이 중단되었습니다.")
except Exception as e:
logger.critical(f"예상치 못한 오류 발생: {str(e)}")
traceback.print_exc()
2.3 데이터 저장 최적화
대량의 데이터를 효율적으로 저장하는 방법입니다.
import pandas as pd
import sqlite3
import json
import os
from datetime import datetime
import time
import logging
# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DataStorage:
def __init__(self, project_name):
self.project_name = project_name
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.data_dir = f"data_{project_name}_{self.timestamp}"
self.chunk_size = 1000 # 청크 크기
self.buffer = [] # 메모리 버퍼
# 데이터 디렉토리 생성
if not os.path.exists(self.data_dir):
os.makedirs(self.data_dir)
logger.info(f"데이터 디렉토리 생성: {self.data_dir}")
# SQLite 데이터베이스 초기화
self.db_path = os.path.join(self.data_dir, f"{project_name}.db")
self.conn = sqlite3.connect(self.db_path)
logger.info(f"SQLite 데이터베이스 생성: {self.db_path}")
def create_table(self, table_name, schema):
"""
SQLite 테이블 생성
Args:
table_name (str): 테이블 이름
schema (dict): 컬럼 이름과 타입을 정의한 딕셔너리
"""
cursor = self.conn.cursor()
columns = ", ".join([f"{col} {dtype}" for col, dtype in schema.items()])
query = f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, {columns})"
cursor.execute(query)
self.conn.commit()
logger.info(f"테이블 생성 완료: {table_name}")
def add_item(self, table_name, item):
"""
데이터 항목 추가 (버퍼에 저장)
Args:
table_name (str): 저장할 테이블 이름
item (dict): 저장할 데이터 항목
"""
self.buffer.append((table_name, item))
# 버퍼 크기가 청크 크기에 도달하면 저장
if len(self.buffer) >= self.chunk_size:
self.save_buffer()
def save_buffer(self):
"""버퍼에 있는 데이터를 DB에 저장"""
if not self.buffer:
return
# 테이블별 데이터 그룹화
grouped_data = {}
for table_name, item in self.buffer:
if table_name not in grouped_data:
grouped_data[table_name] = []
grouped_data[table_name].append(item)
# 각 테이블에 데이터 저장
cursor = self.conn.cursor()
total_saved = 0
for table_name, items in grouped_data.items():
if not items:
continue
# 첫 항목의 컬럼 가져오기
columns = list(items[0].keys())
placeholders = ", ".join(["?"] * len(columns))
column_str = ", ".join(columns)
query = f"INSERT INTO {table_name} ({column_str}) VALUES ({placeholders})"
# 데이터 변환
values = [tuple(item.get(col, None) for col in columns) for item in items]
cursor.executemany(query, values)
total_saved += len(items)
self.conn.commit()
logger.info(f"{total_saved}개 항목을 데이터베이스에 저장했습니다.")
# 버퍼 비우기
self.buffer = []
def save_to_csv(self, table_name, file_name=None):
"""
테이블 데이터를 CSV로 내보내기
Args:
table_name (str): 내보낼 테이블 이름
file_name (str, optional): 저장할 파일 이름
"""
if file_name is None:
file_name = f"{table_name}_{self.timestamp}.csv"
file_path = os.path.join(self.data_dir, file_name)
# 먼저 버퍼에 남은 데이터 저장
self.save_buffer()
# 데이터 조회 및 CSV 저장
query = f"SELECT * FROM {table_name}"
df = pd.read_sql_query(query, self.conn)
df.to_csv(file_path, index=False, encoding="utf-8-sig")
logger.info(f"{len(df)}개 행을 CSV 파일로 저장했습니다: {file_path}")
return file_path
def save_to_json(self, table_name, file_name=None):
"""
테이블 데이터를 JSON으로 내보내기
Args:
table_name (str): 내보낼 테이블 이름
file_name (str, optional): 저장할 파일 이름
"""
if file_name is None:
file_name = f"{table_name}_{self.timestamp}.json"
file_path = os.path.join(self.data_dir, file_name)
# 먼저 버퍼에 남은 데이터 저장
self.save_buffer()
# 데이터 조회 및 JSON 저장
query = f"SELECT * FROM {table_name}"
df = pd.read_sql_query(query, self.conn)
# 데이터프레임을 레코드 방향 딕셔너리 리스트로 변환
records = df.to_dict(orient="records")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=2)
logger.info(f"{len(records)}개 레코드를 JSON 파일로 저장했습니다: {file_path}")
return file_path
def close(self):
"""연결된 리소스 정리"""
# 버퍼에 남은 데이터 저장
self.save_buffer()
# 데이터베이스 연결 종료
if self.conn:
self.conn.close()
logger.info("데이터베이스 연결이 종료되었습니다.")
# 사용 예시
def scrape_and_store_example():
# 스토리지 초기화
storage = DataStorage("product_scraper")
# 테이블 스키마 정의
schema = {
"name": "TEXT",
"price": "TEXT",
"description": "TEXT",
"url": "TEXT",
"rating": "REAL",
"review_count": "INTEGER",
"category": "TEXT",
"brand": "TEXT",
"scraped_at": "TIMESTAMP"
}
# 테이블 생성
storage.create_table("products", schema)
try:
# 스크래핑 로직 (간략화)
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
for page in range(1, 10):
logger.info(f"페이지 {page} 스크래핑 중...")
url = f"https://www.example.com/products?page={page}"
driver.get(url)
time.sleep(2)
# 여기서 실제 제품 데이터를 수집하는 코드가 들어갑니다
# 샘플 데이터로 대체
for i in range(20):
product = {
"name": f"제품 {page}-{i}",
"price": f"{random.randint(10000, 100000)}원",
"description": f"제품 {page}-{i}에 대한 설명입니다.",
"url": f"https://www.example.com/product/{page*100+i}",
"rating": round(random.uniform(3.0, 5.0), 1),
"review_count": random.randint(0, 1000),
"category": random.choice(["전자기기", "의류", "식품", "가구"]),
"brand": random.choice(["브랜드A", "브랜드B", "브랜드C"]),
"scraped_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
# 수집한 데이터 저장
storage.add_item("products", product)
# 스크래핑 완료 후 CSV와 JSON으로 내보내기
csv_path = storage.save_to_csv("products")
json_path = storage
'오픈소스를 위한 기초 상식' 카테고리의 다른 글
SQLite 학습 가이드 (0) | 2025.03.24 |
---|---|
실전 웹 스크래핑 프로젝트 학습자료 (0) | 2025.03.23 |
XRDP로 원격 세팅 (0) | 2025.03.20 |
당분간 파이썬? Pandas 관련 정리하기 (0) | 2025.02.18 |
실습: 객체지향 프로그래밍을 활용한 다양한 프로그램 구현 (0) | 2025.02.16 |