본문 바로가기
오픈소스를 위한 기초 상식

데이터 수집-분석-저장 파이프라인 구축 가이드

by 지나가는 프로도 2025. 3. 26.

데이터 파이프라인은 데이터의 수집부터 분석, 저장까지의 전체 과정을 자동화하는 시스템입니다. 이 가이드에서는 Python과 SQLite를 활용하여 효율적인 데이터 파이프라인을 구축하는 방법을 단계별로 알아보겠습니다.

1. 파이프라인 아키텍처 설계

데이터 파이프라인은 크게 다음 세 단계로 구성됩니다:

  1. 데이터 수집: 다양한 소스에서 데이터를 가져옵니다.
  2. 데이터 분석/변환: 수집된 데이터를 처리하고 분석합니다.
  3. 데이터 저장: 분석된 데이터를 저장하고 필요할 때 접근할 수 있게 합니다.

파이프라인을 구축하기 위한 기본 프로젝트 구조는 다음과 같습니다:

data_pipeline/
│
├── config/
│   └── config.py          # 설정 파일
│
├── collectors/
│   ├── __init__.py
│   ├── api_collector.py   # API 데이터 수집기
│   ├── web_scraper.py     # 웹 스크래핑 수집기
│   └── file_reader.py     # 파일 데이터 수집기
│
├── processors/
│   ├── __init__.py
│   ├── cleaner.py         # 데이터 정제
│   ├── transformer.py     # 데이터 변환
│   └── analyzer.py        # 데이터 분석
│
├── storage/
│   ├── __init__.py
│   ├── sqlite_manager.py  # SQLite 관리
│   └── file_manager.py    # 파일 저장 관리
│
├── utils/
│   ├── __init__.py
│   ├── logger.py          # 로깅 유틸리티
│   └── helpers.py         # 도우미 함수
│
├── pipeline.py            # 파이프라인 메인 실행 파일
└── requirements.txt       # 필요한 패키지 목록

2. 데이터 수집 구현

다양한 소스에서 데이터를 수집하는 방법을 살펴보겠습니다.

2.1 설정 파일 구성

# config/config.py
import os
from datetime import datetime

# 기본 경로 설정
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, 'data')
LOG_DIR = os.path.join(BASE_DIR, 'logs')

# 데이터베이스 설정
DATABASE = {
    'name': os.path.join(DATA_DIR, 'pipeline_data.db'),
    'backup_dir': os.path.join(DATA_DIR, 'backups')
}

# API 설정
API_CONFIG = {
    'weather': {
        'url': 'https://api.openweathermap.org/data/2.5/weather',
        'key': 'your_api_key',
        'params': {
            'q': 'Seoul',
            'units': 'metric'
        }
    },
    'stock': {
        'url': 'https://api.alphavantage.co/query',
        'key': 'your_api_key',
        'params': {
            'function': 'TIME_SERIES_DAILY',
            'symbol': 'AAPL'
        }
    }
}

# 웹 스크래핑 설정
SCRAPING_CONFIG = {
    'news': {
        'url': 'https://news.naver.com/',
        'selectors': {
            'headlines': '.headline_list a',
            'content': '.content'
        }
    }
}

# 파일 데이터 설정
FILE_CONFIG = {
    'sales_data': {
        'path': os.path.join(DATA_DIR, 'sales'),
        'format': 'csv',
        'encoding': 'utf-8'
    }
}

# 로깅 설정
LOG_CONFIG = {
    'filename': os.path.join(LOG_DIR, f'pipeline_{datetime.now().strftime("%Y%m%d")}.log'),
    'level': 'INFO'
}

# 각 디렉토리가 존재하지 않으면 생성
for directory in [DATA_DIR, LOG_DIR, DATABASE['backup_dir']]:
    if not os.path.exists(directory):
        os.makedirs(directory)

2.2 로깅 설정

# utils/logger.py
import logging
import os
from config.config import LOG_CONFIG

def setup_logger(name='pipeline'):
    """로깅 설정"""
    
    # 로그 디렉토리 확인
    log_dir = os.path.dirname(LOG_CONFIG['filename'])
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    
    # 로거 설정
    logger = logging.getLogger(name)
    logger.setLevel(getattr(logging, LOG_CONFIG['level']))
    
    # 이미 핸들러가 있으면 추가하지 않음
    if not logger.handlers:
        # 파일 핸들러
        file_handler = logging.FileHandler(LOG_CONFIG['filename'])
        file_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(file_format)
        logger.addHandler(file_handler)
        
        # 콘솔 핸들러
        console_handler = logging.StreamHandler()
        console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        console_handler.setFormatter(console_format)
        logger.addHandler(console_handler)
    
    return logger

2.3 API 데이터 수집기

# collectors/api_collector.py
import requests
import json
from datetime import datetime
import os
from config.config import API_CONFIG
from utils.logger import setup_logger

class APICollector:
    """API에서 데이터를 수집하는 클래스"""
    
    def __init__(self, api_name):
        """
        API 수집기 초기화
        
        Parameters:
        api_name (str): 설정 파일에 정의된 API 이름 ('weather', 'stock' 등)
        """
        self.logger = setup_logger(f'collector.api.{api_name}')
        
        if api_name not in API_CONFIG:
            raise ValueError(f"알 수 없는 API: {api_name}. 유효한 API: {list(API_CONFIG.keys())}")
        
        self.api_name = api_name
        self.config = API_CONFIG[api_name]
        self.base_url = self.config['url']
        self.api_key = self.config['key']
        self.params = self.config['params'].copy()
        
        self.logger.info(f"{api_name} API 수집기 초기화됨")
    
    def collect(self, additional_params=None):
        """
        API에서 데이터를 수집
        
        Parameters:
        additional_params (dict, optional): 기본 파라미터에 추가할 파라미터
        
        Returns:
        dict: 수집된 데이터, 수집 시간, 수집 소스 정보
        """
        try:
            # 파라미터 설정
            params = self.params.copy()
            if additional_params:
                params.update(additional_params)
            
            # API 키 추가
            params['appid' if self.api_name == 'weather' else 'apikey'] = self.api_key
            
            # API 요청
            self.logger.info(f"{self.api_name} API 요청 시작: {self.base_url}")
            response = requests.get(self.base_url, params=params)
            response.raise_for_status()  # HTTP 에러 발생 시 예외 발생
            
            # 응답 처리
            data = response.json()
            
            # 메타데이터 추가
            result = {
                'data': data,
                'source': self.api_name,
                'collected_at': datetime.now().isoformat(),
                'status': 'success'
            }
            
            self.logger.info(f"{self.api_name} 데이터 수집 성공")
            return result
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"{self.api_name} API 요청 실패: {e}")
            return {
                'data': None,
                'source': self.api_name,
                'collected_at': datetime.now().isoformat(),
                'status': 'error',
                'error': str(e)
            }
    
    def save_raw_data(self, data, output_dir):
        """수집된 원시 데이터를 파일로 저장"""
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = os.path.join(output_dir, f"{self.api_name}_{timestamp}.json")
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"원시 데이터가 저장됨: {filename}")
        return filename

# 사용 예시
if __name__ == "__main__":
    from config.config import DATA_DIR
    
    # 날씨 데이터 수집
    weather_collector = APICollector('weather')
    weather_data = weather_collector.collect()
    
    # 원시 데이터 저장
    raw_dir = os.path.join(DATA_DIR, 'raw', 'weather')
    weather_collector.save_raw_data(weather_data, raw_dir)

2.4 웹 스크래핑 수집기

# collectors/web_scraper.py
import requests
from bs4 import BeautifulSoup
import json
import os
from datetime import datetime
from config.config import SCRAPING_CONFIG
from utils.logger import setup_logger

class WebScraper:
    """웹사이트에서 데이터를 스크래핑하는 클래스"""
    
    def __init__(self, scraper_name):
        """
        웹 스크래퍼 초기화
        
        Parameters:
        scraper_name (str): 설정 파일에 정의된 스크래퍼 이름 ('news' 등)
        """
        self.logger = setup_logger(f'collector.scraper.{scraper_name}')
        
        if scraper_name not in SCRAPING_CONFIG:
            raise ValueError(f"알 수 없는 스크래퍼: {scraper_name}. 유효한 스크래퍼: {list(SCRAPING_CONFIG.keys())}")
        
        self.scraper_name = scraper_name
        self.config = SCRAPING_CONFIG[scraper_name]
        self.url = self.config['url']
        self.selectors = self.config['selectors']
        
        self.logger.info(f"{scraper_name} 웹 스크래퍼 초기화됨")
    
    def collect(self, url=None):
        """
        웹사이트에서 데이터를 스크래핑
        
        Parameters:
        url (str, optional): 스크래핑할 대체 URL
        
        Returns:
        dict: 수집된 데이터, 수집 시간, 수집 소스 정보
        """
        try:
            target_url = url if url else self.url
            
            # 헤더 설정으로 차단 방지
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            
            self.logger.info(f"{self.scraper_name} 웹 스크래핑 시작: {target_url}")
            response = requests.get(target_url, headers=headers)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 스크래핑 결과 저장
            result_data = {}
            
            # 각 선택자에 해당하는 데이터 추출
            for name, selector in self.selectors.items():
                elements = soup.select(selector)
                
                if name == 'headlines':
                    # 헤드라인은 텍스트와 링크 모두 추출
                    result_data[name] = [
                        {
                            'text': el.get_text(strip=True),
                            'link': el.get('href', '')
                        } for el in elements if el.get_text(strip=True)
                    ]
                else:
                    # 일반 텍스트 데이터
                    result_data[name] = [el.get_text(strip=True) for el in elements if el.get_text(strip=True)]
            
            # 메타데이터 추가
            result = {
                'data': result_data,
                'source': self.scraper_name,
                'url': target_url,
                'collected_at': datetime.now().isoformat(),
                'status': 'success'
            }
            
            self.logger.info(f"{self.scraper_name} 데이터 스크래핑 성공: {len(result_data.get('headlines', []))} 헤드라인")
            return result
            
        except Exception as e:
            self.logger.error(f"{self.scraper_name} 웹 스크래핑 실패: {e}")
            return {
                'data': None,
                'source': self.scraper_name,
                'url': url if url else self.url,
                'collected_at': datetime.now().isoformat(),
                'status': 'error',
                'error': str(e)
            }
    
    def save_raw_data(self, data, output_dir):
        """스크래핑된 원시 데이터를 파일로 저장"""
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = os.path.join(output_dir, f"{self.scraper_name}_{timestamp}.json")
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"원시 데이터가 저장됨: {filename}")
        return filename

# 사용 예시
if __name__ == "__main__":
    from config.config import DATA_DIR
    
    # 뉴스 데이터 스크래핑
    news_scraper = WebScraper('news')
    news_data = news_scraper.collect()
    
    # 원시 데이터 저장
    raw_dir = os.path.join(DATA_DIR, 'raw', 'news')
    news_scraper.save_raw_data(news_data, raw_dir)

2.5 파일 데이터 수집기

# collectors/file_reader.py
import os
import glob
import pandas as pd
import json
from datetime import datetime
from config.config import FILE_CONFIG
from utils.logger import setup_logger

class FileDataCollector:
    """파일에서 데이터를 수집하는 클래스"""
    
    def __init__(self, data_name):
        """
        파일 데이터 수집기 초기화
        
        Parameters:
        data_name (str): 설정 파일에 정의된 데이터 이름 ('sales_data' 등)
        """
        self.logger = setup_logger(f'collector.file.{data_name}')
        
        if data_name not in FILE_CONFIG:
            raise ValueError(f"알 수 없는 데이터 소스: {data_name}. 유효한 소스: {list(FILE_CONFIG.keys())}")
        
        self.data_name = data_name
        self.config = FILE_CONFIG[data_name]
        self.path = self.config['path']
        self.format = self.config['format'].lower()
        self.encoding = self.config.get('encoding', 'utf-8')
        
        self.logger.info(f"{data_name} 파일 데이터 수집기 초기화됨")
    
    def collect(self, file_pattern=None):
        """
        파일에서 데이터를 수집
        
        Parameters:
        file_pattern (str, optional): 특정 파일 패턴 (예: "sales_*.csv")
        
        Returns:
        dict: 수집된 데이터, 수집 시간, 수집 소스 정보
        """
        try:
            # 파일 패턴 설정
            if file_pattern:
                search_pattern = os.path.join(self.path, file_pattern)
            else:
                search_pattern = os.path.join(self.path, f"*.{self.format}")
            
            # 파일 목록 가져오기
            files = glob.glob(search_pattern)
            
            if not files:
                self.logger.warning(f"패턴과 일치하는 파일 없음: {search_pattern}")
                return {
                    'data': None,
                    'source': self.data_name,
                    'pattern': search_pattern,
                    'collected_at': datetime.now().isoformat(),
                    'status': 'warning',
                    'message': '일치하는 파일 없음'
                }
            
            self.logger.info(f"{len(files)} 개의 파일 발견: {search_pattern}")
            
            # 모든 파일에서 데이터 수집
            all_data = []
            file_metadata = []
            
            for file_path in files:
                self.logger.info(f"파일 읽는 중: {file_path}")
                
                file_data = self._read_file(file_path)
                if file_data is not None:
                    all_data.append(file_data)
                    
                    # 파일 메타데이터 추가
                    file_info = {
                        'path': file_path,
                        'filename': os.path.basename(file_path),
                        'size': os.path.getsize(file_path),
                        'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
                    }
                    file_metadata.append(file_info)
            
            # 메타데이터 추가
            result = {
                'data': all_data,
                'source': self.data_name,
                'files': file_metadata,
                'collected_at': datetime.now().isoformat(),
                'status': 'success',
                'file_count': len(all_data)
            }
            
            self.logger.info(f"{self.data_name} 파일 데이터 수집 성공")
            return result
            
        except Exception as e:
            self.logger.error(f"{self.data_name} 파일 데이터 수집 실패: {e}")
            return {
                'data': None,
                'source': self.data_name,
                'pattern': search_pattern if 'search_pattern' in locals() else None,
                'collected_at': datetime.now().isoformat(),
                'status': 'error',
                'error': str(e)
            }
    
    def _read_file(self, file_path):
        """파일 형식에 따라 적절한 방법으로 파일 읽기"""
        try:
            if self.format == 'csv':
                df = pd.read_csv(file_path, encoding=self.encoding)
                return df.to_dict(orient='records')
            
            elif self.format == 'excel' or file_path.endswith(('.xlsx', '.xls')):
                df = pd.read_excel(file_path)
                return df.to_dict(orient='records')
            
            elif self.format == 'json':
                with open(file_path, 'r', encoding=self.encoding) as f:
                    return json.load(f)
            
            elif self.format == 'txt':
                with open(file_path, 'r', encoding=self.encoding) as f:
                    return f.readlines()
            
            else:
                self.logger.warning(f"지원되지 않는 파일 형식: {self.format}")
                return None
                
        except Exception as e:
            self.logger.error(f"파일 읽기 실패 {file_path}: {e}")
            return None
    
    def save_collected_data(self, data, output_dir):
        """수집된 데이터를 JSON으로 저장"""
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = os.path.join(output_dir, f"{self.data_name}_collected_{timestamp}.json")
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"수집된 데이터가 저장됨: {filename}")
        return filename

# 사용 예시
if __name__ == "__main__":
    from config.config import DATA_DIR
    
    # 판매 데이터 수집
    sales_collector = FileDataCollector('sales_data')
    sales_data = sales_collector.collect()
    
    # 수집된 데이터 저장
    collected_dir = os.path.join(DATA_DIR, 'collected', 'sales')
    sales_collector.save_collected_data(sales_data, collected_dir)

3. 데이터 처리 및 분석

수집된 데이터를 정제하고 분석하는 과정을 구현해 보겠습니다.

3.1 데이터 정제

# processors/cleaner.py
import pandas as pd
import numpy as np
from datetime import datetime
import json
from utils.logger import setup_logger

class DataCleaner:
    """수집된 데이터를 정제하는 클래스"""
    
    def __init__(self, source_name):
        """
        데이터 정제기 초기화
        
        Parameters:
        source_name (str): 데이터 소스 이름 ('weather', 'news', 'sales_data' 등)
        """
        self.logger = setup_logger(f'processor.cleaner.{source_name}')
        self.source_name = source_name
    
    def clean(self, data):
        """
        데이터 소스에 따른 정제 메서드를 호출
        
        Parameters:
        data (dict): 수집된 원시 데이터
        
        Returns:
        dict: 정제된 데이터
        """
        if data['status'] != 'success' or data['data'] is None:
            self.logger.warning(f"정제할 데이터가 없습니다: {data['status']}")
            return data
        
        self.logger.info(f"{self.source_name} 데이터 정제 시작")
        
        # 소스별 정제 메서드 호출
        if self.source_name == 'weather':
            cleaned_data = self._clean_weather_data(data)
        elif self.source_name == 'stock':
            cleaned_data = self._clean_stock_data(data)
        elif self.source_name == 'news':
            cleaned_data = self._clean_news_data(data)
        elif self.source_name == 'sales_data':
            cleaned_data = self._clean_sales_data(data)
        else:
            self.logger.warning(f"알 수 없는 데이터 소스: {self.source_name}")
            cleaned_data = data
        
        self.logger.info(f"{self.source_name} 데이터 정제 완료")
        return cleaned_data
    
    def _clean_weather_data(self, data):
        """날씨 데이터 정제"""
        try:
            raw_data = data['data']
            
            # 필요한 날씨 정보만 추출
            cleaned = {
                'city': raw_data.get('name', 'Unknown'),
                'country': raw_data.get('sys', {}).get('country', 'Unknown'),
                'temperature': raw_data.get('main', {}).get('temp'),
                'feels_like': raw_data.get('main', {}).get('feels_like'),
                'humidity': raw_data.get('main', {}).get('humidity'),
                'pressure': raw_data.get('main', {}).get('pressure'),
                'wind_speed': raw_data.get('wind', {}).get('speed'),
                'wind_direction': raw_data.get('wind', {}).get('deg'),
                'clouds': raw_data.get('clouds', {}).get('all'),
                'weather_main': raw_data.get('weather', [{}])[0].get('main'),
                'weather_description': raw_data.get('weather', [{}])[0].get('description'),
                'timestamp': raw_data.get('dt'),
                'sunrise': raw_data.get('sys', {}).get('sunrise'),
                'sunset': raw_data.get('sys', {}).get('sunset')
            }
            
            # 온도가 없는 경우 처리
            if cleaned['temperature'] is None:
                self.logger.warning("온도 데이터가 없습니다")
                cleaned['temperature'] = None
            
            # 타임스탬프를 ISO 형식으로 변환
            if cleaned['timestamp']:
                cleaned['datetime'] = datetime.fromtimestamp(cleaned['timestamp']).isoformat()
            else:
                cleaned['datetime'] = None
            
            # 정제된 데이터로 업데이트
            result = data.copy()
            result['data'] = cleaned
            result['processed_at'] = datetime.now().isoformat()
            
            return result
            
        except Exception as e:
            self.logger.error(f"날씨 데이터 정제 중 오류: {e}")
            data['status'] = 'error'
            data['error'] = f"정제 오류: {str(e)}"
            return data
    
    def _clean_news_data(self, data):
        """뉴스 데이터 정제"""
        try:
            raw_data = data['data']
            
            # 헤드라인 정제
            headlines = raw_data.get('headlines', [])
            cleaned_headlines = []
            
            for item in headlines:
                # 텍스트 정제 (공백 제거, 줄바꿈 정리 등)
                text = item.get('text', '').strip()
                
                # 빈 텍스트 건너뛰기
                if not text:
                    continue
                
                # URL 정규화
                link = item.get('link', '')
                if link and not (link.startswith('http://') or link.startswith('https://')):
                    if link.startswith('/'):
                        # 상대 경로인 경우 기본 URL 추가
                        base_url = data['url'].rstrip('/')
                        link = f"{base_url}{link}"
                
                cleaned_headlines.append({
                    'title': text,
                    'url': link
                })
            
            # 정제된 데이터로 업데이트
            result = data.copy()
            result['data'] = {
                'headlines': cleaned_headlines,
                'count': len(cleaned_headlines)
            }
            result['processed_at'] = datetime.now().isoformat()
            
            return result
            
        except Exception as e:
            self.logger.error(f"뉴스 데이터 정제 중 오류: {e}")
            data['status'] = 'error'
            data['error'] = f"정제 오류: {str(e)}"
            return data
    
    def _clean_sales_data(self, data):
        """판매 데이터 정제"""
        try:
            all_data = data['data']
            
            if not all_data:
                return data
            
            # 모든 파일의 데이터를 하나의 데이터프레임으로 통합
            df = pd.DataFrame()
            for file_data in all_data:
                if file_data:
                    temp_df = pd.DataFrame(file_data)
                    df = pd.concat([df, temp_df], ignore_index=True)
            
            if df.empty:
                self.logger.warning("정제할 판매 데이터가 없습니다")
                return data
            
            # 열 이름 표준화 (