데이터 파이프라인은 데이터의 수집부터 분석, 저장까지의 전체 과정을 자동화하는 시스템입니다. 이 가이드에서는 Python과 SQLite를 활용하여 효율적인 데이터 파이프라인을 구축하는 방법을 단계별로 알아보겠습니다.
1. 파이프라인 아키텍처 설계
데이터 파이프라인은 크게 다음 세 단계로 구성됩니다:
- 데이터 수집: 다양한 소스에서 데이터를 가져옵니다.
- 데이터 분석/변환: 수집된 데이터를 처리하고 분석합니다.
- 데이터 저장: 분석된 데이터를 저장하고 필요할 때 접근할 수 있게 합니다.
파이프라인을 구축하기 위한 기본 프로젝트 구조는 다음과 같습니다:
data_pipeline/
│
├── config/
│ └── config.py # 설정 파일
│
├── collectors/
│ ├── __init__.py
│ ├── api_collector.py # API 데이터 수집기
│ ├── web_scraper.py # 웹 스크래핑 수집기
│ └── file_reader.py # 파일 데이터 수집기
│
├── processors/
│ ├── __init__.py
│ ├── cleaner.py # 데이터 정제
│ ├── transformer.py # 데이터 변환
│ └── analyzer.py # 데이터 분석
│
├── storage/
│ ├── __init__.py
│ ├── sqlite_manager.py # SQLite 관리
│ └── file_manager.py # 파일 저장 관리
│
├── utils/
│ ├── __init__.py
│ ├── logger.py # 로깅 유틸리티
│ └── helpers.py # 도우미 함수
│
├── pipeline.py # 파이프라인 메인 실행 파일
└── requirements.txt # 필요한 패키지 목록
2. 데이터 수집 구현
다양한 소스에서 데이터를 수집하는 방법을 살펴보겠습니다.
2.1 설정 파일 구성
# config/config.py
import os
from datetime import datetime
# 기본 경로 설정
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATA_DIR = os.path.join(BASE_DIR, 'data')
LOG_DIR = os.path.join(BASE_DIR, 'logs')
# 데이터베이스 설정
DATABASE = {
'name': os.path.join(DATA_DIR, 'pipeline_data.db'),
'backup_dir': os.path.join(DATA_DIR, 'backups')
}
# API 설정
API_CONFIG = {
'weather': {
'url': 'https://api.openweathermap.org/data/2.5/weather',
'key': 'your_api_key',
'params': {
'q': 'Seoul',
'units': 'metric'
}
},
'stock': {
'url': 'https://api.alphavantage.co/query',
'key': 'your_api_key',
'params': {
'function': 'TIME_SERIES_DAILY',
'symbol': 'AAPL'
}
}
}
# 웹 스크래핑 설정
SCRAPING_CONFIG = {
'news': {
'url': 'https://news.naver.com/',
'selectors': {
'headlines': '.headline_list a',
'content': '.content'
}
}
}
# 파일 데이터 설정
FILE_CONFIG = {
'sales_data': {
'path': os.path.join(DATA_DIR, 'sales'),
'format': 'csv',
'encoding': 'utf-8'
}
}
# 로깅 설정
LOG_CONFIG = {
'filename': os.path.join(LOG_DIR, f'pipeline_{datetime.now().strftime("%Y%m%d")}.log'),
'level': 'INFO'
}
# 각 디렉토리가 존재하지 않으면 생성
for directory in [DATA_DIR, LOG_DIR, DATABASE['backup_dir']]:
if not os.path.exists(directory):
os.makedirs(directory)
2.2 로깅 설정
# utils/logger.py
import logging
import os
from config.config import LOG_CONFIG
def setup_logger(name='pipeline'):
"""로깅 설정"""
# 로그 디렉토리 확인
log_dir = os.path.dirname(LOG_CONFIG['filename'])
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# 로거 설정
logger = logging.getLogger(name)
logger.setLevel(getattr(logging, LOG_CONFIG['level']))
# 이미 핸들러가 있으면 추가하지 않음
if not logger.handlers:
# 파일 핸들러
file_handler = logging.FileHandler(LOG_CONFIG['filename'])
file_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_format)
logger.addHandler(file_handler)
# 콘솔 핸들러
console_handler = logging.StreamHandler()
console_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)
return logger
2.3 API 데이터 수집기
# collectors/api_collector.py
import requests
import json
from datetime import datetime
import os
from config.config import API_CONFIG
from utils.logger import setup_logger
class APICollector:
"""API에서 데이터를 수집하는 클래스"""
def __init__(self, api_name):
"""
API 수집기 초기화
Parameters:
api_name (str): 설정 파일에 정의된 API 이름 ('weather', 'stock' 등)
"""
self.logger = setup_logger(f'collector.api.{api_name}')
if api_name not in API_CONFIG:
raise ValueError(f"알 수 없는 API: {api_name}. 유효한 API: {list(API_CONFIG.keys())}")
self.api_name = api_name
self.config = API_CONFIG[api_name]
self.base_url = self.config['url']
self.api_key = self.config['key']
self.params = self.config['params'].copy()
self.logger.info(f"{api_name} API 수집기 초기화됨")
def collect(self, additional_params=None):
"""
API에서 데이터를 수집
Parameters:
additional_params (dict, optional): 기본 파라미터에 추가할 파라미터
Returns:
dict: 수집된 데이터, 수집 시간, 수집 소스 정보
"""
try:
# 파라미터 설정
params = self.params.copy()
if additional_params:
params.update(additional_params)
# API 키 추가
params['appid' if self.api_name == 'weather' else 'apikey'] = self.api_key
# API 요청
self.logger.info(f"{self.api_name} API 요청 시작: {self.base_url}")
response = requests.get(self.base_url, params=params)
response.raise_for_status() # HTTP 에러 발생 시 예외 발생
# 응답 처리
data = response.json()
# 메타데이터 추가
result = {
'data': data,
'source': self.api_name,
'collected_at': datetime.now().isoformat(),
'status': 'success'
}
self.logger.info(f"{self.api_name} 데이터 수집 성공")
return result
except requests.exceptions.RequestException as e:
self.logger.error(f"{self.api_name} API 요청 실패: {e}")
return {
'data': None,
'source': self.api_name,
'collected_at': datetime.now().isoformat(),
'status': 'error',
'error': str(e)
}
def save_raw_data(self, data, output_dir):
"""수집된 원시 데이터를 파일로 저장"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(output_dir, f"{self.api_name}_{timestamp}.json")
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self.logger.info(f"원시 데이터가 저장됨: {filename}")
return filename
# 사용 예시
if __name__ == "__main__":
from config.config import DATA_DIR
# 날씨 데이터 수집
weather_collector = APICollector('weather')
weather_data = weather_collector.collect()
# 원시 데이터 저장
raw_dir = os.path.join(DATA_DIR, 'raw', 'weather')
weather_collector.save_raw_data(weather_data, raw_dir)
2.4 웹 스크래핑 수집기
# collectors/web_scraper.py
import requests
from bs4 import BeautifulSoup
import json
import os
from datetime import datetime
from config.config import SCRAPING_CONFIG
from utils.logger import setup_logger
class WebScraper:
"""웹사이트에서 데이터를 스크래핑하는 클래스"""
def __init__(self, scraper_name):
"""
웹 스크래퍼 초기화
Parameters:
scraper_name (str): 설정 파일에 정의된 스크래퍼 이름 ('news' 등)
"""
self.logger = setup_logger(f'collector.scraper.{scraper_name}')
if scraper_name not in SCRAPING_CONFIG:
raise ValueError(f"알 수 없는 스크래퍼: {scraper_name}. 유효한 스크래퍼: {list(SCRAPING_CONFIG.keys())}")
self.scraper_name = scraper_name
self.config = SCRAPING_CONFIG[scraper_name]
self.url = self.config['url']
self.selectors = self.config['selectors']
self.logger.info(f"{scraper_name} 웹 스크래퍼 초기화됨")
def collect(self, url=None):
"""
웹사이트에서 데이터를 스크래핑
Parameters:
url (str, optional): 스크래핑할 대체 URL
Returns:
dict: 수집된 데이터, 수집 시간, 수집 소스 정보
"""
try:
target_url = url if url else self.url
# 헤더 설정으로 차단 방지
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.logger.info(f"{self.scraper_name} 웹 스크래핑 시작: {target_url}")
response = requests.get(target_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 스크래핑 결과 저장
result_data = {}
# 각 선택자에 해당하는 데이터 추출
for name, selector in self.selectors.items():
elements = soup.select(selector)
if name == 'headlines':
# 헤드라인은 텍스트와 링크 모두 추출
result_data[name] = [
{
'text': el.get_text(strip=True),
'link': el.get('href', '')
} for el in elements if el.get_text(strip=True)
]
else:
# 일반 텍스트 데이터
result_data[name] = [el.get_text(strip=True) for el in elements if el.get_text(strip=True)]
# 메타데이터 추가
result = {
'data': result_data,
'source': self.scraper_name,
'url': target_url,
'collected_at': datetime.now().isoformat(),
'status': 'success'
}
self.logger.info(f"{self.scraper_name} 데이터 스크래핑 성공: {len(result_data.get('headlines', []))} 헤드라인")
return result
except Exception as e:
self.logger.error(f"{self.scraper_name} 웹 스크래핑 실패: {e}")
return {
'data': None,
'source': self.scraper_name,
'url': url if url else self.url,
'collected_at': datetime.now().isoformat(),
'status': 'error',
'error': str(e)
}
def save_raw_data(self, data, output_dir):
"""스크래핑된 원시 데이터를 파일로 저장"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(output_dir, f"{self.scraper_name}_{timestamp}.json")
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self.logger.info(f"원시 데이터가 저장됨: {filename}")
return filename
# 사용 예시
if __name__ == "__main__":
from config.config import DATA_DIR
# 뉴스 데이터 스크래핑
news_scraper = WebScraper('news')
news_data = news_scraper.collect()
# 원시 데이터 저장
raw_dir = os.path.join(DATA_DIR, 'raw', 'news')
news_scraper.save_raw_data(news_data, raw_dir)
2.5 파일 데이터 수집기
# collectors/file_reader.py
import os
import glob
import pandas as pd
import json
from datetime import datetime
from config.config import FILE_CONFIG
from utils.logger import setup_logger
class FileDataCollector:
"""파일에서 데이터를 수집하는 클래스"""
def __init__(self, data_name):
"""
파일 데이터 수집기 초기화
Parameters:
data_name (str): 설정 파일에 정의된 데이터 이름 ('sales_data' 등)
"""
self.logger = setup_logger(f'collector.file.{data_name}')
if data_name not in FILE_CONFIG:
raise ValueError(f"알 수 없는 데이터 소스: {data_name}. 유효한 소스: {list(FILE_CONFIG.keys())}")
self.data_name = data_name
self.config = FILE_CONFIG[data_name]
self.path = self.config['path']
self.format = self.config['format'].lower()
self.encoding = self.config.get('encoding', 'utf-8')
self.logger.info(f"{data_name} 파일 데이터 수집기 초기화됨")
def collect(self, file_pattern=None):
"""
파일에서 데이터를 수집
Parameters:
file_pattern (str, optional): 특정 파일 패턴 (예: "sales_*.csv")
Returns:
dict: 수집된 데이터, 수집 시간, 수집 소스 정보
"""
try:
# 파일 패턴 설정
if file_pattern:
search_pattern = os.path.join(self.path, file_pattern)
else:
search_pattern = os.path.join(self.path, f"*.{self.format}")
# 파일 목록 가져오기
files = glob.glob(search_pattern)
if not files:
self.logger.warning(f"패턴과 일치하는 파일 없음: {search_pattern}")
return {
'data': None,
'source': self.data_name,
'pattern': search_pattern,
'collected_at': datetime.now().isoformat(),
'status': 'warning',
'message': '일치하는 파일 없음'
}
self.logger.info(f"{len(files)} 개의 파일 발견: {search_pattern}")
# 모든 파일에서 데이터 수집
all_data = []
file_metadata = []
for file_path in files:
self.logger.info(f"파일 읽는 중: {file_path}")
file_data = self._read_file(file_path)
if file_data is not None:
all_data.append(file_data)
# 파일 메타데이터 추가
file_info = {
'path': file_path,
'filename': os.path.basename(file_path),
'size': os.path.getsize(file_path),
'modified': datetime.fromtimestamp(os.path.getmtime(file_path)).isoformat()
}
file_metadata.append(file_info)
# 메타데이터 추가
result = {
'data': all_data,
'source': self.data_name,
'files': file_metadata,
'collected_at': datetime.now().isoformat(),
'status': 'success',
'file_count': len(all_data)
}
self.logger.info(f"{self.data_name} 파일 데이터 수집 성공")
return result
except Exception as e:
self.logger.error(f"{self.data_name} 파일 데이터 수집 실패: {e}")
return {
'data': None,
'source': self.data_name,
'pattern': search_pattern if 'search_pattern' in locals() else None,
'collected_at': datetime.now().isoformat(),
'status': 'error',
'error': str(e)
}
def _read_file(self, file_path):
"""파일 형식에 따라 적절한 방법으로 파일 읽기"""
try:
if self.format == 'csv':
df = pd.read_csv(file_path, encoding=self.encoding)
return df.to_dict(orient='records')
elif self.format == 'excel' or file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
return df.to_dict(orient='records')
elif self.format == 'json':
with open(file_path, 'r', encoding=self.encoding) as f:
return json.load(f)
elif self.format == 'txt':
with open(file_path, 'r', encoding=self.encoding) as f:
return f.readlines()
else:
self.logger.warning(f"지원되지 않는 파일 형식: {self.format}")
return None
except Exception as e:
self.logger.error(f"파일 읽기 실패 {file_path}: {e}")
return None
def save_collected_data(self, data, output_dir):
"""수집된 데이터를 JSON으로 저장"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.join(output_dir, f"{self.data_name}_collected_{timestamp}.json")
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
self.logger.info(f"수집된 데이터가 저장됨: {filename}")
return filename
# 사용 예시
if __name__ == "__main__":
from config.config import DATA_DIR
# 판매 데이터 수집
sales_collector = FileDataCollector('sales_data')
sales_data = sales_collector.collect()
# 수집된 데이터 저장
collected_dir = os.path.join(DATA_DIR, 'collected', 'sales')
sales_collector.save_collected_data(sales_data, collected_dir)
3. 데이터 처리 및 분석
수집된 데이터를 정제하고 분석하는 과정을 구현해 보겠습니다.
3.1 데이터 정제
# processors/cleaner.py
import pandas as pd
import numpy as np
from datetime import datetime
import json
from utils.logger import setup_logger
class DataCleaner:
"""수집된 데이터를 정제하는 클래스"""
def __init__(self, source_name):
"""
데이터 정제기 초기화
Parameters:
source_name (str): 데이터 소스 이름 ('weather', 'news', 'sales_data' 등)
"""
self.logger = setup_logger(f'processor.cleaner.{source_name}')
self.source_name = source_name
def clean(self, data):
"""
데이터 소스에 따른 정제 메서드를 호출
Parameters:
data (dict): 수집된 원시 데이터
Returns:
dict: 정제된 데이터
"""
if data['status'] != 'success' or data['data'] is None:
self.logger.warning(f"정제할 데이터가 없습니다: {data['status']}")
return data
self.logger.info(f"{self.source_name} 데이터 정제 시작")
# 소스별 정제 메서드 호출
if self.source_name == 'weather':
cleaned_data = self._clean_weather_data(data)
elif self.source_name == 'stock':
cleaned_data = self._clean_stock_data(data)
elif self.source_name == 'news':
cleaned_data = self._clean_news_data(data)
elif self.source_name == 'sales_data':
cleaned_data = self._clean_sales_data(data)
else:
self.logger.warning(f"알 수 없는 데이터 소스: {self.source_name}")
cleaned_data = data
self.logger.info(f"{self.source_name} 데이터 정제 완료")
return cleaned_data
def _clean_weather_data(self, data):
"""날씨 데이터 정제"""
try:
raw_data = data['data']
# 필요한 날씨 정보만 추출
cleaned = {
'city': raw_data.get('name', 'Unknown'),
'country': raw_data.get('sys', {}).get('country', 'Unknown'),
'temperature': raw_data.get('main', {}).get('temp'),
'feels_like': raw_data.get('main', {}).get('feels_like'),
'humidity': raw_data.get('main', {}).get('humidity'),
'pressure': raw_data.get('main', {}).get('pressure'),
'wind_speed': raw_data.get('wind', {}).get('speed'),
'wind_direction': raw_data.get('wind', {}).get('deg'),
'clouds': raw_data.get('clouds', {}).get('all'),
'weather_main': raw_data.get('weather', [{}])[0].get('main'),
'weather_description': raw_data.get('weather', [{}])[0].get('description'),
'timestamp': raw_data.get('dt'),
'sunrise': raw_data.get('sys', {}).get('sunrise'),
'sunset': raw_data.get('sys', {}).get('sunset')
}
# 온도가 없는 경우 처리
if cleaned['temperature'] is None:
self.logger.warning("온도 데이터가 없습니다")
cleaned['temperature'] = None
# 타임스탬프를 ISO 형식으로 변환
if cleaned['timestamp']:
cleaned['datetime'] = datetime.fromtimestamp(cleaned['timestamp']).isoformat()
else:
cleaned['datetime'] = None
# 정제된 데이터로 업데이트
result = data.copy()
result['data'] = cleaned
result['processed_at'] = datetime.now().isoformat()
return result
except Exception as e:
self.logger.error(f"날씨 데이터 정제 중 오류: {e}")
data['status'] = 'error'
data['error'] = f"정제 오류: {str(e)}"
return data
def _clean_news_data(self, data):
"""뉴스 데이터 정제"""
try:
raw_data = data['data']
# 헤드라인 정제
headlines = raw_data.get('headlines', [])
cleaned_headlines = []
for item in headlines:
# 텍스트 정제 (공백 제거, 줄바꿈 정리 등)
text = item.get('text', '').strip()
# 빈 텍스트 건너뛰기
if not text:
continue
# URL 정규화
link = item.get('link', '')
if link and not (link.startswith('http://') or link.startswith('https://')):
if link.startswith('/'):
# 상대 경로인 경우 기본 URL 추가
base_url = data['url'].rstrip('/')
link = f"{base_url}{link}"
cleaned_headlines.append({
'title': text,
'url': link
})
# 정제된 데이터로 업데이트
result = data.copy()
result['data'] = {
'headlines': cleaned_headlines,
'count': len(cleaned_headlines)
}
result['processed_at'] = datetime.now().isoformat()
return result
except Exception as e:
self.logger.error(f"뉴스 데이터 정제 중 오류: {e}")
data['status'] = 'error'
data['error'] = f"정제 오류: {str(e)}"
return data
def _clean_sales_data(self, data):
"""판매 데이터 정제"""
try:
all_data = data['data']
if not all_data:
return data
# 모든 파일의 데이터를 하나의 데이터프레임으로 통합
df = pd.DataFrame()
for file_data in all_data:
if file_data:
temp_df = pd.DataFrame(file_data)
df = pd.concat([df, temp_df], ignore_index=True)
if df.empty:
self.logger.warning("정제할 판매 데이터가 없습니다")
return data
# 열 이름 표준화 (
'오픈소스를 위한 기초 상식' 카테고리의 다른 글
데이터 대시보드 제작 가이드 (0) | 2025.03.28 |
---|---|
자동 리포트 생성 시스템 학습 자료 (0) | 2025.03.27 |
SQLite 기반 자동화 시스템 구축 가이드 (0) | 2025.03.25 |
SQLite 학습 가이드 (0) | 2025.03.24 |
실전 웹 스크래핑 프로젝트 학습자료 (0) | 2025.03.23 |