본문 바로가기
오픈소스를 위한 기초 상식

고급 웹 스크래핑 가이드

by 지나가는 프로도 2025. 3. 22.

Selenium 기초

1.1 웹드라이버 설정

Selenium을 사용하기 위해서는 먼저 웹드라이버를 설정해야 합니다. 최근 Selenium 4부터는 WebDriver Manager를 통해 자동으로 브라우저 드라이버를 설치할 수 있습니다.

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.chrome.options import Options

# Chrome 옵션 설정
chrome_options = Options()
chrome_options.add_argument("--headless")  # 브라우저를 표시하지 않고 실행
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")

# 웹드라이버 설정
driver = webdriver.Chrome(
    service=Service(ChromeDriverManager().install()),
    options=chrome_options
)

# 웹사이트 접속
driver.get("https://example.com")
print(driver.title)

# 세션 종료
driver.quit()

1.2 동적 페이지 크롤링

JavaScript로 동적으로 콘텐츠를 로드하는, SPA(Single Page Application) 웹사이트에서는 단순한 HTTP 요청으로는 모든 데이터를 가져올 수 없습니다. Selenium은 실제 브라우저처럼 동작하여 동적 콘텐츠를 렌더링한 후 데이터를 수집할 수 있습니다.

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.get("https://www.example.com/dynamic-content")

try:
    # 페이지가 완전히 로드될 때까지 대기 (최대 10초)
    WebDriverWait(driver, 10).until(
        # ID가 'content-loaded'인 요소가 표시될 때까지 대기
        EC.presence_of_element_located((By.ID, "content-loaded"))
    )
    
    # 무한 스크롤 페이지 처리
    last_height = driver.execute_script("return document.body.scrollHeight")
    
    while True:
        # 페이지 끝까지 스크롤
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        
        # 새 콘텐츠가 로드될 때까지 대기
        time.sleep(2)
        
        # 새 스크롤 높이 계산
        new_height = driver.execute_script("return document.body.scrollHeight")
        
        # 스크롤 높이가 같으면 더 이상 콘텐츠가 없는 것
        if new_height == last_height:
            break
            
        last_height = new_height
    
    # 동적으로 로드된 모든 항목 가져오기
    items = driver.find_elements(By.CSS_SELECTOR, ".item")
    
    for item in items:
        title = item.find_element(By.CSS_SELECTOR, ".title").text
        description = item.find_element(By.CSS_SELECTOR, ".description").text
        print(f"Title: {title}, Description: {description}")
        
except Exception as e:
    print(f"오류 발생: {e}")
    
finally:
    driver.quit()

 

 

자동화된 데이터 수집

복잡한 상호작용(클릭, 폼 제출, 드롭다운 선택 등)이 필요한 경우 Selenium의 자동화 기능을 활용할 수 있습니다.

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
import pandas as pd
import time

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
driver.get("https://www.example.com/search-form")

try:
    # 검색 폼 작성
    search_input = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "search-input"))
    )
    search_input.send_keys("데이터 사이언스")
    
    # 드롭다운 메뉴에서 옵션 선택
    category_dropdown = Select(driver.find_element(By.ID, "category"))
    category_dropdown.select_by_visible_text("기술")
    
    # 날짜 범위 설정
    date_from = driver.find_element(By.ID, "date-from")
    date_from.clear()
    date_from.send_keys("2023-01-01")
    
    date_to = driver.find_element(By.ID, "date-to")
    date_to.clear()
    date_to.send_keys("2023-12-31")
    
    # 검색 버튼 클릭
    driver.find_element(By.ID, "search-button").click()
    
    # 결과 로딩 대기
    WebDriverWait(driver, 15).until(
        EC.presence_of_element_located((By.CSS_SELECTOR, ".search-results"))
    )
    
    # 결과 수집
    results = []
    result_elements = driver.find_elements(By.CSS_SELECTOR, ".result-item")
    
    for item in result_elements:
        result = {
            "title": item.find_element(By.CSS_SELECTOR, ".result-title").text,
            "date": item.find_element(By.CSS_SELECTOR, ".result-date").text,
            "description": item.find_element(By.CSS_SELECTOR, ".result-description").text,
            "url": item.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
        }
        results.append(result)
    
    # 데이터프레임으로 변환
    df = pd.DataFrame(results)
    print(f"{len(results)}개의 결과를 찾았습니다.")
    print(df.head())
    
    # CSV로 저장
    df.to_csv("search_results.csv", index=False, encoding="utf-8-sig")
    
except Exception as e:
    print(f"오류 발생: {e}")
    
finally:
    driver.quit()

 

 

대규모 데이터 수집

2.1 페이지네이션 처리

여러 페이지에 걸쳐 있는 데이터를 효율적으로 수집하는 방법입니다.

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
import pandas as pd
import time

driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
url = "https://www.example.com/products?page=1"
driver.get(url)

all_products = []
current_page = 1
max_pages = 20  # 안전장치로 최대 페이지 수 설정

try:
    while current_page <= max_pages:
        print(f"현재 페이지: {current_page}")
        
        # 상품 목록이 로드될 때까지 대기
        WebDriverWait(driver, 10).until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".product-item"))
        )
        
        # 현재 페이지의 상품 정보 수집
        products = driver.find_elements(By.CSS_SELECTOR, ".product-item")
        
        for product in products:
            try:
                product_data = {
                    "name": product.find_element(By.CSS_SELECTOR, ".product-name").text,
                    "price": product.find_element(By.CSS_SELECTOR, ".product-price").text,
                    "rating": product.find_element(By.CSS_SELECTOR, ".product-rating").text,
                    "url": product.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
                }
                all_products.append(product_data)
            except NoSuchElementException:
                # 일부 요소가 없는 경우 건너뛰기
                continue
        
        # 다음 페이지 버튼 찾기
        try:
            next_button = driver.find_element(By.CSS_SELECTOR, ".pagination .next")
            
            # 다음 페이지 버튼이 비활성화되어 있으면 종료
            if "disabled" in next_button.get_attribute("class"):
                print("마지막 페이지에 도달했습니다.")
                break
                
            # 다음 페이지로 이동
            next_button.click()
            current_page += 1
            
            # 페이지 전환 대기
            time.sleep(2)
            
        except NoSuchElementException:
            print("다음 페이지 버튼을 찾을 수 없습니다.")
            break
            
except Exception as e:
    print(f"오류 발생: {e}")
    
finally:
    driver.quit()

# 수집한 데이터 처리
df = pd.DataFrame(all_products)
print(f"총 {len(all_products)}개의 상품 정보를 수집했습니다.")
df.to_csv("products.csv", index=False, encoding="utf-8-sig")

2.2 에러 핸들링

대규모 스크래핑에서는 다양한 예외 상황이 발생할 수 있습니다. 안정적인 스크래핑을 위한 에러 핸들링 방법입니다.

import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    NoSuchElementException, 
    TimeoutException, 
    StaleElementReferenceException,
    WebDriverException
)
import pandas as pd
import time
import random
import logging
from requests.exceptions import RequestException
import sys
from urllib3.exceptions import MaxRetryError
import traceback

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("scraping.log"),
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# 재시도 함수
def retry_with_backoff(func, max_tries=5, backoff_factor=0.5):
    def wrapper(*args, **kwargs):
        tries = 0
        while tries < max_tries:
            try:
                return func(*args, **kwargs)
            except (RequestException, WebDriverException, MaxRetryError, TimeoutException) as e:
                tries += 1
                if tries == max_tries:
                    logger.error(f"최대 재시도 횟수 초과: {str(e)}")
                    raise
                
                sleep_time = backoff_factor * (2 ** (tries - 1)) + random.uniform(0, 1)
                logger.warning(f"오류 발생, {sleep_time:.2f}초 후 재시도 ({tries}/{max_tries}): {str(e)}")
                time.sleep(sleep_time)
    return wrapper

# 웹드라이버 초기화 함수
@retry_with_backoff
def init_driver():
    chrome_options = webdriver.ChromeOptions()
    chrome_options.add_argument("--headless")
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--window-size=1920,1080")
    chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
    
    driver = webdriver.Chrome(
        service=Service(ChromeDriverManager().install()),
        options=chrome_options
    )
    return driver

# 페이지 로드 함수
@retry_with_backoff
def load_page(driver, url):
    driver.get(url)
    WebDriverWait(driver, 15).until(
        EC.presence_of_element_located((By.TAG_NAME, "body"))
    )

# 요소 찾기 함수
def safe_find_element(driver, by, value, timeout=10):
    try:
        element = WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((by, value))
        )
        return element
    except (TimeoutException, NoSuchElementException):
        logger.warning(f"요소를 찾을 수 없음: {by}={value}")
        return None

def safe_find_elements(driver, by, value, timeout=10):
    try:
        WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((by, value))
        )
        elements = driver.find_elements(by, value)
        return elements
    except (TimeoutException, NoSuchElementException):
        logger.warning(f"요소들을 찾을 수 없음: {by}={value}")
        return []

# 텍스트 추출 함수
def safe_get_text(element):
    if element is None:
        return ""
    try:
        return element.text.strip()
    except StaleElementReferenceException:
        logger.warning("StaleElementReferenceException 발생")
        return ""

# 속성 추출 함수
def safe_get_attribute(element, attribute):
    if element is None:
        return ""
    try:
        return element.get_attribute(attribute)
    except (StaleElementReferenceException, WebDriverException):
        logger.warning(f"속성 추출 중 오류 발생: {attribute}")
        return ""

# 예시: 제품 스크래핑 함수
def scrape_products():
    driver = None
    results = []
    
    try:
        driver = init_driver()
        base_url = "https://www.example.com/products?page="
        page = 1
        max_pages = 100
        
        while page <= max_pages:
            url = base_url + str(page)
            try:
                logger.info(f"페이지 로드 중: {url}")
                load_page(driver, url)
                
                # 스크래핑 간 간격 두기 (서버 부하 방지)
                time.sleep(random.uniform(1, 3))
                
                products = safe_find_elements(driver, By.CSS_SELECTOR, ".product-item")
                
                if not products:
                    logger.info("더 이상 제품이 없습니다. 스크래핑 종료.")
                    break
                
                logger.info(f"현재 페이지에서 {len(products)}개 제품 발견")
                
                for product in products:
                    try:
                        name_element = safe_find_element(driver, By.CSS_SELECTOR, ".product-name", timeout=5)
                        price_element = safe_find_element(driver, By.CSS_SELECTOR, ".product-price", timeout=5)
                        url_element = safe_find_element(driver, By.CSS_SELECTOR, "a", timeout=5)
                        
                        product_data = {
                            "name": safe_get_text(name_element),
                            "price": safe_get_text(price_element),
                            "url": safe_get_attribute(url_element, "href")
                        }
                        
                        if product_data["name"]:  # 이름이 있는 제품만 추가
                            results.append(product_data)
                        
                    except Exception as e:
                        logger.error(f"제품 처리 중 오류: {str(e)}")
                        continue
                
                # 다음 페이지 확인
                next_button = safe_find_element(driver, By.CSS_SELECTOR, ".pagination .next")
                if next_button and "disabled" not in safe_get_attribute(next_button, "class"):
                    next_button.click()
                    page += 1
                else:
                    logger.info("마지막 페이지에 도달했습니다.")
                    break
                    
            except Exception as e:
                logger.error(f"페이지 {page} 처리 중 오류: {str(e)}")
                traceback.print_exc()
                # 오류가 발생해도 다음 페이지로 진행
                page += 1
                continue
    
    except Exception as e:
        logger.error(f"스크래핑 프로세스 중 심각한 오류: {str(e)}")
        traceback.print_exc()
    
    finally:
        if driver:
            driver.quit()
        
        # 결과 저장
        if results:
            try:
                df = pd.DataFrame(results)
                output_file = f"products_{time.strftime('%Y%m%d_%H%M%S')}.csv"
                df.to_csv(output_file, index=False, encoding="utf-8-sig")
                logger.info(f"총 {len(results)}개의 제품 데이터를 {output_file}에 저장했습니다.")
            except Exception as e:
                logger.error(f"결과 저장 중 오류: {str(e)}")
                # 백업으로 JSON 형식으로 저장 시도
                import json
                with open("products_backup.json", "w", encoding="utf-8") as f:
                    json.dump(results, f, ensure_ascii=False, indent=2)
                logger.info("백업 JSON 파일로 저장 완료")
        
        return results

# 메인 실행
if __name__ == "__main__":
    try:
        scrape_products()
    except KeyboardInterrupt:
        logger.info("사용자에 의해 스크래핑이 중단되었습니다.")
    except Exception as e:
        logger.critical(f"예상치 못한 오류 발생: {str(e)}")
        traceback.print_exc()

2.3 데이터 저장 최적화

대량의 데이터를 효율적으로 저장하는 방법입니다.

import pandas as pd
import sqlite3
import json
import os
from datetime import datetime
import time
import logging

# 로깅 설정
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DataStorage:
    def __init__(self, project_name):
        self.project_name = project_name
        self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.data_dir = f"data_{project_name}_{self.timestamp}"
        self.chunk_size = 1000  # 청크 크기
        self.buffer = []  # 메모리 버퍼
        
        # 데이터 디렉토리 생성
        if not os.path.exists(self.data_dir):
            os.makedirs(self.data_dir)
            logger.info(f"데이터 디렉토리 생성: {self.data_dir}")
            
        # SQLite 데이터베이스 초기화
        self.db_path = os.path.join(self.data_dir, f"{project_name}.db")
        self.conn = sqlite3.connect(self.db_path)
        logger.info(f"SQLite 데이터베이스 생성: {self.db_path}")
        
    def create_table(self, table_name, schema):
        """
        SQLite 테이블 생성
        
        Args:
            table_name (str): 테이블 이름
            schema (dict): 컬럼 이름과 타입을 정의한 딕셔너리
        """
        cursor = self.conn.cursor()
        columns = ", ".join([f"{col} {dtype}" for col, dtype in schema.items()])
        query = f"CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, {columns})"
        cursor.execute(query)
        self.conn.commit()
        logger.info(f"테이블 생성 완료: {table_name}")
        
    def add_item(self, table_name, item):
        """
        데이터 항목 추가 (버퍼에 저장)
        
        Args:
            table_name (str): 저장할 테이블 이름
            item (dict): 저장할 데이터 항목
        """
        self.buffer.append((table_name, item))
        
        # 버퍼 크기가 청크 크기에 도달하면 저장
        if len(self.buffer) >= self.chunk_size:
            self.save_buffer()
            
    def save_buffer(self):
        """버퍼에 있는 데이터를 DB에 저장"""
        if not self.buffer:
            return
            
        # 테이블별 데이터 그룹화
        grouped_data = {}
        for table_name, item in self.buffer:
            if table_name not in grouped_data:
                grouped_data[table_name] = []
            grouped_data[table_name].append(item)
            
        # 각 테이블에 데이터 저장
        cursor = self.conn.cursor()
        total_saved = 0
        
        for table_name, items in grouped_data.items():
            if not items:
                continue
                
            # 첫 항목의 컬럼 가져오기
            columns = list(items[0].keys())
            placeholders = ", ".join(["?"] * len(columns))
            column_str = ", ".join(columns)
            
            query = f"INSERT INTO {table_name} ({column_str}) VALUES ({placeholders})"
            
            # 데이터 변환
            values = [tuple(item.get(col, None) for col in columns) for item in items]
            
            cursor.executemany(query, values)
            total_saved += len(items)
            
        self.conn.commit()
        logger.info(f"{total_saved}개 항목을 데이터베이스에 저장했습니다.")
        
        # 버퍼 비우기
        self.buffer = []
        
    def save_to_csv(self, table_name, file_name=None):
        """
        테이블 데이터를 CSV로 내보내기
        
        Args:
            table_name (str): 내보낼 테이블 이름
            file_name (str, optional): 저장할 파일 이름
        """
        if file_name is None:
            file_name = f"{table_name}_{self.timestamp}.csv"
            
        file_path = os.path.join(self.data_dir, file_name)
        
        # 먼저 버퍼에 남은 데이터 저장
        self.save_buffer()
        
        # 데이터 조회 및 CSV 저장
        query = f"SELECT * FROM {table_name}"
        df = pd.read_sql_query(query, self.conn)
        df.to_csv(file_path, index=False, encoding="utf-8-sig")
        
        logger.info(f"{len(df)}개 행을 CSV 파일로 저장했습니다: {file_path}")
        return file_path
        
    def save_to_json(self, table_name, file_name=None):
        """
        테이블 데이터를 JSON으로 내보내기
        
        Args:
            table_name (str): 내보낼 테이블 이름
            file_name (str, optional): 저장할 파일 이름
        """
        if file_name is None:
            file_name = f"{table_name}_{self.timestamp}.json"
            
        file_path = os.path.join(self.data_dir, file_name)
        
        # 먼저 버퍼에 남은 데이터 저장
        self.save_buffer()
        
        # 데이터 조회 및 JSON 저장
        query = f"SELECT * FROM {table_name}"
        df = pd.read_sql_query(query, self.conn)
        
        # 데이터프레임을 레코드 방향 딕셔너리 리스트로 변환
        records = df.to_dict(orient="records")
        
        with open(file_path, "w", encoding="utf-8") as f:
            json.dump(records, f, ensure_ascii=False, indent=2)
            
        logger.info(f"{len(records)}개 레코드를 JSON 파일로 저장했습니다: {file_path}")
        return file_path
        
    def close(self):
        """연결된 리소스 정리"""
        # 버퍼에 남은 데이터 저장
        self.save_buffer()
        
        # 데이터베이스 연결 종료
        if self.conn:
            self.conn.close()
            logger.info("데이터베이스 연결이 종료되었습니다.")

# 사용 예시
def scrape_and_store_example():
    # 스토리지 초기화
    storage = DataStorage("product_scraper")
    
    # 테이블 스키마 정의
    schema = {
        "name": "TEXT",
        "price": "TEXT",
        "description": "TEXT",
        "url": "TEXT",
        "rating": "REAL",
        "review_count": "INTEGER",
        "category": "TEXT",
        "brand": "TEXT",
        "scraped_at": "TIMESTAMP"
    }
    
    # 테이블 생성
    storage.create_table("products", schema)
    
    try:
        # 스크래핑 로직 (간략화)
        from selenium import webdriver
        from selenium.webdriver.chrome.service import Service
        from webdriver_manager.chrome import ChromeDriverManager
        
        driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))
        
        for page in range(1, 10):
            logger.info(f"페이지 {page} 스크래핑 중...")
            url = f"https://www.example.com/products?page={page}"
            driver.get(url)
            time.sleep(2)
            
            # 여기서 실제 제품 데이터를 수집하는 코드가 들어갑니다
            # 샘플 데이터로 대체
            for i in range(20):
                product = {
                    "name": f"제품 {page}-{i}",
                    "price": f"{random.randint(10000, 100000)}원",
                    "description": f"제품 {page}-{i}에 대한 설명입니다.",
                    "url": f"https://www.example.com/product/{page*100+i}",
                    "rating": round(random.uniform(3.0, 5.0), 1),
                    "review_count": random.randint(0, 1000),
                    "category": random.choice(["전자기기", "의류", "식품", "가구"]),
                    "brand": random.choice(["브랜드A", "브랜드B", "브랜드C"]),
                    "scraped_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                }
                
                # 수집한 데이터 저장
                storage.add_item("products", product)
                
        # 스크래핑 완료 후 CSV와 JSON으로 내보내기
        csv_path = storage.save_to_csv("products")
        json_path = storage