본문 바로가기
오픈소스를 위한 기초 상식

자동 리포트 생성 시스템 학습 자료

by 지나가는 프로도 2025. 3. 27.

1. 개요

자동 리포트 생성 시스템은 스크래핑한 데이터를 분석하고 정리하여 읽기 쉬운 보고서 형태로 자동 변환하는 도구입니다. 이 시스템은 다음과 같은 상황에서 유용합니다:

  • 주기적인 시장 분석 보고서 생성
  • 웹사이트 트래픽 및 성능 보고서 자동화
  • 소셜 미디어 트렌드 분석 리포팅
  • 경쟁사 가격 모니터링 보고서
  • 뉴스 기사 요약 및 트렌드 분석

이 학습 자료에서는 Python을 활용하여 데이터를 수집, 분석하고 PDF, HTML, 또는 Excel 형식의 전문적인 보고서로 자동 생성하는 방법을 다룹니다.

2. 시스템 구성 요소

자동 리포트 생성 시스템은 다음과 같은 주요 구성 요소로 이루어집니다:

  1. 데이터 수집 모듈: 웹 스크래핑, API 호출 등을 통한 데이터 수집
  2. 데이터 처리 모듈: 수집된 데이터 정제, 변환, 분석
  3. 시각화 모듈: 차트, 그래프, 표 등 시각적 요소 생성
  4. 템플릿 엔진: 보고서 레이아웃 및 디자인 템플릿 관리
  5. 보고서 생성기: 최종 보고서 파일 생성 (PDF, HTML, Excel 등)
  6. 배포 모듈: 이메일 전송, 클라우드 저장, 웹 게시 등

3. 필요한 라이브러리 및 도구

# 데이터 수집 및 처리
pip install requests beautifulsoup4 selenium pandas numpy

# 데이터 시각화
pip install matplotlib seaborn plotly

# 보고서 생성
pip install jinja2 pdfkit weasyprint xlsxwriter

# 스케줄링 및 자동화
pip install schedule apscheduler

4. 구현 예제: 상품 가격 모니터링 리포트 시스템

다음은 온라인 쇼핑몰의 상품 가격을 모니터링하고 주간 보고서를 생성하는 자동화 시스템의 예제입니다.

4.1 시스템 구조

price_report_system/
├── config.py               # 설정 및 환경 변수
├── data_collector.py       # 데이터 수집 모듈
├── data_processor.py       # 데이터 처리 및 분석 모듈 
├── visualizer.py           # 데이터 시각화 모듈
├── report_generator.py     # 보고서 생성 모듈
├── distributor.py          # 보고서 배포 모듈
├── templates/              # 보고서 템플릿 디렉토리
│   ├── report_template.html  # HTML 템플릿
│   └── email_template.html   # 이메일 템플릿
├── static/                 # 정적 자원 디렉토리 
│   ├── css/                  # 스타일시트
│   └── images/               # 이미지 자원
├── data/                   # 데이터 저장 디렉토리
│   ├── raw/                  # 원시 데이터
│   └── processed/            # 처리된 데이터
├── reports/                # 생성된 보고서 디렉토리
└── main.py                 # 메인 실행 파일

4.2 데이터 수집 모듈 (data_collector.py)

import requests
import pandas as pd
from bs4 import BeautifulSoup
import logging
import time
import random
import sqlite3
from datetime import datetime
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

# 로깅 설정
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='price_monitor.log'
)
logger = logging.getLogger(__name__)

class PriceDataCollector:
    def __init__(self, db_path="data/price_data.db"):
        """가격 데이터 수집기 초기화"""
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.create_tables()
        self.driver = None
        
    def create_tables(self):
        """필요한 데이터베이스 테이블 생성"""
        cursor = self.conn.cursor()
        
        # 제품 정보 테이블
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id TEXT UNIQUE,
            name TEXT,
            category TEXT,
            brand TEXT,
            url TEXT
        )
        ''')
        
        # 가격 기록 테이블
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS price_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id TEXT,
            price REAL,
            original_price REAL,
            discount_rate REAL,
            store TEXT,
            timestamp DATETIME,
            FOREIGN KEY (product_id) REFERENCES products(product_id)
        )
        ''')
        
        self.conn.commit()
        
    def init_selenium(self):
        """Selenium 웹드라이버 초기화"""
        if self.driver is not None:
            return
            
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--window-size=1920,1080")
        chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/98.0.4758.102 Safari/537.36")
        
        try:
            self.driver = webdriver.Chrome(
                service=Service(ChromeDriverManager().install()),
                options=chrome_options
            )
            logger.info("Selenium 웹드라이버 초기화 완료")
        except Exception as e:
            logger.error(f"Selenium 웹드라이버 초기화 실패: {str(e)}")
            raise
    
    def close_selenium(self):
        """Selenium 웹드라이버 종료"""
        if self.driver:
            self.driver.quit()
            self.driver = None
    
    def collect_product_info(self, url, product_id, category):
        """제품 정보 수집"""
        self.init_selenium()
        
        try:
            self.driver.get(url)
            time.sleep(random.uniform(2, 4))  # 랜덤 대기
            
            # 제품명 추출
            name_element = self.driver.find_element_by_css_selector("h1.product-name")
            name = name_element.text.strip()
            
            # 브랜드 추출
            brand_element = self.driver.find_element_by_css_selector("div.brand-name")
            brand = brand_element.text.strip()
            
            # 제품 정보 저장
            cursor = self.conn.cursor()
            cursor.execute('''
            INSERT OR REPLACE INTO products (product_id, name, category, brand, url)
            VALUES (?, ?, ?, ?, ?)
            ''', (product_id, name, category, brand, url))
            
            self.conn.commit()
            logger.info(f"제품 정보 수집 완료: {name}")
            
            return {
                "product_id": product_id,
                "name": name,
                "category": category,
                "brand": brand,
                "url": url
            }
            
        except Exception as e:
            logger.error(f"제품 정보 수집 실패: {str(e)}")
            return None
    
    def collect_price_data(self, product_id, url, store):
        """제품 가격 정보 수집"""
        self.init_selenium()
        
        try:
            self.driver.get(url)
            time.sleep(random.uniform(2, 4))  # 랜덤 대기
            
            # 현재 판매 가격 추출
            price_element = self.driver.find_element_by_css_selector("span.current-price")
            price_text = price_element.text.strip().replace('원', '').replace(',', '')
            price = float(price_text)
            
            # 원래 가격 추출 (있는 경우)
            try:
                original_price_element = self.driver.find_element_by_css_selector("span.original-price")
                original_price_text = original_price_element.text.strip().replace('원', '').replace(',', '')
                original_price = float(original_price_text)
                
                # 할인율 계산
                if original_price > 0:
                    discount_rate = round((original_price - price) / original_price * 100, 2)
                else:
                    discount_rate = 0.0
            except:
                original_price = price
                discount_rate = 0.0
            
            # 현재 시간
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            
            # 가격 정보 저장
            cursor = self.conn.cursor()
            cursor.execute('''
            INSERT INTO price_history (product_id, price, original_price, discount_rate, store, timestamp)
            VALUES (?, ?, ?, ?, ?, ?)
            ''', (product_id, price, original_price, discount_rate, store, timestamp))
            
            self.conn.commit()
            logger.info(f"가격 정보 수집 완료: {product_id}, 가격: {price}원")
            
            return {
                "product_id": product_id,
                "price": price,
                "original_price": original_price,
                "discount_rate": discount_rate,
                "store": store,
                "timestamp": timestamp
            }
            
        except Exception as e:
            logger.error(f"가격 정보 수집 실패: {str(e)}")
            return None
    
    def collect_competitor_prices(self, product_name, num_competitors=3):
        """경쟁사 가격 정보 수집"""
        search_url = f"https://search.shopping.naver.com/search/all?query={product_name}"
        self.init_selenium()
        
        try:
            self.driver.get(search_url)
            time.sleep(random.uniform(3, 5))
            
            # 검색 결과 상품 목록
            product_elements = self.driver.find_elements_by_css_selector("div.product_item")
            
            competitor_data = []
            for idx, product_elem in enumerate(product_elements[:num_competitors]):
                try:
                    # 상품명
                    name = product_elem.find_element_by_css_selector("a.product_title").text.strip()
                    
                    # 가격
                    price_text = product_elem.find_element_by_css_selector("span.price").text.strip()
                    price_text = price_text.replace('원', '').replace(',', '')
                    price = float(price_text)
                    
                    # 판매처
                    store = product_elem.find_element_by_css_selector("span.store").text.strip()
                    
                    competitor_data.append({
                        "name": name,
                        "price": price,
                        "store": store
                    })
                    
                except Exception as e:
                    logger.warning(f"경쟁사 상품 처리 중 오류: {str(e)}")
                    continue
                    
                if len(competitor_data) >= num_competitors:
                    break
            
            logger.info(f"{product_name}의 경쟁사 가격 {len(competitor_data)}개 수집 완료")
            return competitor_data
            
        except Exception as e:
            logger.error(f"경쟁사 가격 수집 실패: {str(e)}")
            return []
    
    def collect_all_data(self, product_list):
        """모든 제품 데이터 수집"""
        try:
            self.init_selenium()
            collected_data = []
            
            for product in product_list:
                product_id = product.get('product_id')
                url = product.get('url')
                category = product.get('category')
                store = product.get('store', '자사몰')
                
                # 제품 정보 수집
                product_info = self.collect_product_info(url, product_id, category)
                
                if product_info:
                    # 가격 정보 수집
                    price_info = self.collect_price_data(product_id, url, store)
                    
                    if price_info:
                        # 경쟁사 가격 수집
                        competitor_prices = self.collect_competitor_prices(product_info['name'])
                        
                        collected_data.append({
                            "product_info": product_info,
                            "price_info": price_info,
                            "competitor_prices": competitor_prices
                        })
                
                # 요청 간 딜레이
                time.sleep(random.uniform(5, 10))
            
            logger.info(f"총 {len(collected_data)}개 제품 데이터 수집 완료")
            return collected_data
            
        except Exception as e:
            logger.error(f"데이터 수집 중 오류 발생: {str(e)}")
            return []
        finally:
            self.close_selenium()
    
    def get_price_history(self, product_id, days=30):
        """제품의 가격 히스토리 조회"""
        try:
            query = f"""
            SELECT price, original_price, discount_rate, timestamp 
            FROM price_history 
            WHERE product_id = ? 
            AND datetime(timestamp) >= datetime('now', '-{days} days')
            ORDER BY timestamp
            """
            
            df = pd.read_sql_query(query, self.conn, params=(product_id,))
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
            return df
        except Exception as e:
            logger.error(f"가격 히스토리 조회 실패: {str(e)}")
            return pd.DataFrame()
    
    def close(self):
        """리소스 정리"""
        self.close_selenium()
        if self.conn:
            self.conn.close()

# 실행 예시
if __name__ == "__main__":
    # 제품 목록 예시
    products = [
        {
            "product_id": "PROD001",
            "url": "https://example.com/product/1",
            "category": "전자기기"
        },
        {
            "product_id": "PROD002",
            "url": "https://example.com/product/2",
            "category": "가전제품"
        }
    ]
    
    collector = PriceDataCollector()
    try:
        data = collector.collect_all_data(products)
        print(f"수집된 데이터: {len(data)}개")
    finally:
        collector.close()

4.3 데이터 처리 모듈 (data_processor.py)

import pandas as pd
import numpy as np
import sqlite3
import logging
from datetime import datetime, timedelta

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    filename='price_processor.log'
)
logger = logging.getLogger(__name__)

class PriceDataProcessor:
    def __init__(self, db_path="data/price_data.db"):
        """가격 데이터 처리기 초기화"""
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        
    def get_products(self, category=None):
        """제품 정보 조회"""
        query = "SELECT * FROM products"
        params = []
        
        if category:
            query += " WHERE category = ?"
            params.append(category)
            
        try:
            df = pd.read_sql_query(query, self.conn, params=params)
            return df
        except Exception as e:
            logger.error(f"제품 정보 조회 실패: {str(e)}")
            return pd.DataFrame()
    
    def get_price_trends(self, days=30):
        """전체 제품의 가격 추세 분석"""
        try:
            query = f"""
            SELECT p.product_id, p.name, p.category, p.brand, 
                   ph.price, ph.original_price, ph.discount_rate, ph.timestamp
            FROM products p
            JOIN price_history ph ON p.product_id = ph.product_id
            WHERE datetime(ph.timestamp) >= datetime('now', '-{days} days')
            ORDER BY p.category, p.name, ph.timestamp
            """
            
            df = pd.read_sql_query(query, self.conn)
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            
            # 일자별 평균 가격 계산
            df['date'] = df['timestamp'].dt.date
            daily_avg = df.groupby(['date', 'category']).agg({
                'price': 'mean',
                'discount_rate': 'mean'
            }).reset_index()
            
            # 카테고리별 평균 가격 변동률 계산
            category_trends = {}
            for category in daily_avg['category'].unique():
                cat_data = daily_avg[daily_avg['category'] == category].sort_values('date')
                if len(cat_data) > 1:
                    first_price = cat_data['price'].iloc[0]
                    last_price = cat_data['price'].iloc[-1]
                    price_change_pct = (last_price - first_price) / first_price * 100
                    
                    category_trends[category] = {
                        'start_price': first_price,
                        'end_price': last_price,
                        'change_pct': price_change_pct,
                        'avg_discount': cat_data['discount_rate'].mean()
                    }
            
            return {
                'daily_avg': daily_avg,
                'category_trends': category_trends
            }
            
        except Exception as e:
            logger.error(f"가격 추세 분석 실패: {str(e)}")
            return {'daily_avg': pd.DataFrame(), 'category_trends': {}}
    
    def identify_price_changes(self, days=7, threshold_pct=5):
        """최근 가격 변동이 큰 제품 식별"""
        try:
            # 각 제품의 최근 가격과 이전 가격 조회
            current_prices = pd.read_sql_query(f"""
                SELECT ph.product_id, p.name, p.category, p.brand, ph.price, ph.timestamp
                FROM price_history ph
                JOIN products p ON ph.product_id = p.product_id
                WHERE ph.timestamp = (
                    SELECT MAX(timestamp) FROM price_history
                    WHERE product_id = ph.product_id
                )
            """, self.conn)
            
            # threshold_pct 이상 변동된 제품 찾기
            significant_changes = []
            
            for _, row in current_prices.iterrows():
                # 이전 가격 조회 (현재 가격보다 이전)
                previous_price = pd.read_sql_query(f"""
                    SELECT price, timestamp
                    FROM price_history
                    WHERE product_id = ?
                    AND timestamp < ?
                    ORDER BY timestamp DESC
                    LIMIT 1
                """, self.conn, params=(row['product_id'], row['timestamp']))
                
                if not previous_price.empty:
                    prev_price = previous_price['price'].iloc[0]
                    curr_price = row['price']
                    
                    # 변동률 계산
                    if prev_price > 0:
                        change_pct = (curr_price - prev_price) / prev_price * 100
                        
                        if abs(change_pct) >= threshold_pct:
                            significant_changes.append({
                                'product_id': row['product_id'],
                                'name': row['name'],
                                'category': row['category'],
                                'brand': row['brand'],
                                'previous_price': prev_price,
                                'current_price': curr_price,
                                'change_pct': change_pct,
                                'timestamp': row['timestamp']
                            })
            
            return pd.DataFrame(significant_changes)
            
        except Exception as e:
            logger.error(f"가격 변동 분석 실패: {str(e)}")
            return pd.DataFrame()
    
    def compare_with_competitors(self):
        """자사 제품과 경쟁사 제품 가격 비교 분석"""
        # 이 부분은 경쟁사 데이터가 별도의 테이블에 저장되어 있다고 가정
        try:
            query = """
            SELECT p.product_id, p.name, p.category, p.brand, 
                   ph.price as our_price, 
                   cp.competitor_name, cp.price as competitor_price,
                   cp.timestamp
            FROM products p
            JOIN price_history ph ON p.product_id = ph.product_id
            JOIN competitor_prices cp ON p.product_id = cp.product_id
            WHERE ph.timestamp = (
                SELECT MAX(timestamp) FROM price_history
                WHERE product_id = p.product_id
            )
            AND cp.timestamp >= datetime('now', '-7 days')
            ORDER BY p.category, p.name, cp.competitor_name
            """
            
            df = pd.read_sql_query(query, self.conn)
            
            # 제품별 가격 차이 분석
            df['price_diff'] = df['our_price'] - df['competitor_price']
            df['price_diff_pct'] = (df['price_diff'] / df['competitor_price']) * 100
            
            # 각 제품별 경쟁사 가격 비교 요약
            product_comparisons = []
            for product_id in df['product_id'].unique():
                product_data = df[df['product_id'] == product_id]
                
                our_price = product_data['our_price'].iloc[0]
                avg_competitor_price = product_data['competitor_price'].mean()
                min_competitor_price = product_data['competitor_price'].min()
                max_competitor_price = product_data['competitor_price'].max()
                
                price_position = (our_price - min_competitor_price) / (max_competitor_price - min_competitor_price) * 100 if max_competitor_price > min_competitor_price else 50
                
                product_comparisons.append({
                    'product_id': product_id,
                    'name': product_data['name'].iloc[0],
                    'category': product_data['category'].iloc[0],
                    'our_price': our_price,
                    'avg_competitor_price': avg_competitor_price,
                    'min_competitor_price': min_competitor_price,
                    'max_competitor_price': max_competitor_price,
                    'price_diff_pct': ((our_price - avg_competitor_price) / avg_competitor_price) * 100,
                    'price_position': price_position  # 0%: 최저가, 100%: 최고가
                })
            
            return {
                'detailed': df,
                'summary': pd.DataFrame(product_comparisons)
            }
            
        except Exception as e:
            logger.error(f"경쟁사 비교 분석 실패: {str(e)}")
            return {'detailed': pd.DataFrame(), 'summary': pd.DataFrame()}
    
    def generate_insights(self):
        """주요 인사이트 생성"""
        insights = []
        
        try:
            # 1. 최근 가격 변동이 큰 제품 파악
            price_changes = self.identify_price_changes(threshold_pct=5)
            if not price_changes.empty:
                increased = price_changes[price_changes['change_pct'] > 0]
                decreased = price_changes[price_changes['change_pct'] < 0]
                
                if not increased.empty:
                    biggest_increase = increased.loc[increased['change_pct'].idxmax()]
                    insights.append(f"가장 큰 가격 인상: {biggest_increase['name']} ({biggest_increase['change_pct']:.1f}% 상승)")
                
                if not decreased.empty:
                    biggest_decrease = decreased.loc[decreased['change_pct'].idxmin()]
                    insights.append(f"가장 큰 가격 인하: {biggest_decrease['name']} ({abs(biggest_decrease['change_pct']):.1f}% 하락)")
            
            # 2. 카테고리별 가격 추세
            price_trends = self.get_price_trends()
            category_trends = price_trends.get('category_trends', {})
            
            for category, trend in category_trends.items():
                if trend['change_pct'] > 3:
                    insights.append(f"{category} 카테고리 가격 상승 추세: {trend['change_pct']:.1f}% 증가")
                elif trend['change_pct'] < -3:
                    insights.append(f"{category} 카테고리 가격 하락 추세: {abs(trend['change_pct']):.1f}% 감소")
            
            # 3. 경쟁사 비교 인사이트
            competitor_comparison = self.compare_with_competitors()
            summary = competitor_comparison.get('summary', pd.DataFrame())
            
            if not summary.empty:
                underpriced = summary[summary['price_diff_pct'] < -10]
                overpriced = summary[summary['price_diff_pct'] > 10]
                
                if not underpriced.empty:
                    insights.append(f"{len(underpriced)}개 제품이 경쟁사 대비 10% 이상 저렴합니다.")
                    most_underpriced = underpriced.loc[underpriced['price_diff_pct'].idxmin()]
                    insights.append(f"가장 저렴한 제품: {most_underpriced['name']} (경쟁사 대비 {abs(most_underpriced['price_diff_pct']):.1f}% 저렴)")
                
                if not overpriced.empty:
                    insights.append(f"{len(overpriced)}개 제품이 경쟁사 대비 10% 이상 비쌉니다.")
                    most_overpriced = overpriced.loc[overpriced['price_diff_pct'].idxmax()]
                    insights.append(f"가장 비싼 제품: {most_overpriced['name']} (경쟁사 대비 {most_overpriced['price_diff_pct']:.1f}% 비쌈)")
            
            return insights
            
        except Exception as e:
            logger.error(f"인사이트 생성 실패: {str(e)}")
            return ["데이터 분석 중 오류가 발생했습니다."]
    
    def prepare_report_data(self):
        """보고서용 데이터 준비"""
        try:
            report_data = {
                'generated_at': datetime.now().strftime("%Y-%m-%d %H:%M"),
                'period': {
                    'start': (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d"),
                    'end': datetime.now().strftime("%Y-%m-%d")
                }
            }
            
            # 제품 정보
            products_df = self.get_products()
            report_data['product_count'] = len(products_df)
            report_data['category_count'] = products_df['category'].nunique()
            
            # 가격 추세
            price_trends = self.get_price_trends()
            report_data['price_trends'] = price_trends
            
            # 가격 변동
            price_changes = self.identify_price_changes()
            report_data['price_changes'] = {
                'data': price_changes.to_dict('records') if not price_changes.empty else [],
                'count': len(price_changes)
            }
            
            # 경쟁사 비교
            competitor_comparison = self.compare_with_competitors()
            report_data['competitor_comparison'] = {