정기적 데이터 수집
앞서 배운 SQLite 지식을 활용하여 실용적인 자동화 시스템을 구축하는 방법을 알아보겠습니다. 이 가이드에서는 정기적 데이터 수집, 자동 보고서 생성, 알림 시스템 구축에 초점을 맞춥니다.
자동화의 첫 단계는 데이터를 정기적으로 수집하는 것입니다. Python의 스케줄링 도구와 SQLite를 결합하여 효율적인 데이터 수집 시스템을 구축할 수 있습니다.
1.1 필요한 라이브러리 설치
pip install schedule requests pandas beautifulsoup4
1.2 기본 스케줄러 설정
import schedule
import time
import sqlite3
import requests
import pandas as pd
from datetime import datetime
def job():
print(f"데이터 수집 작업 실행: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 이 함수에 실제 데이터 수집 코드가 들어갑니다
# 매일 특정 시간에 실행
schedule.every().day.at("10:00").do(job)
# 매 시간마다 실행
schedule.every().hour.do(job)
# 매 5분마다 실행
schedule.every(5).minutes.do(job)
# 스케줄러 실행
while True:
schedule.run_pending()
time.sleep(1)
1.3 웹 API에서 데이터 수집 및 저장
def collect_weather_data():
"""날씨 API에서 데이터를 수집하여 SQLite DB에 저장"""
# API에서 데이터 가져오기
api_key = "your_api_key"
city = "Seoul"
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}&units=metric"
try:
response = requests.get(url)
data = response.json()
# 필요한 데이터 추출
temp = data["main"]["temp"]
humidity = data["main"]["humidity"]
weather_desc = data["weather"][0]["description"]
wind_speed = data["wind"]["speed"]
collection_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# SQLite에 저장
conn = sqlite3.connect('weather_data.db')
cursor = conn.cursor()
# 테이블이 없으면 생성
cursor.execute('''
CREATE TABLE IF NOT EXISTS weather_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
city TEXT,
temperature REAL,
humidity INTEGER,
description TEXT,
wind_speed REAL,
collection_time TEXT
)
''')
# 데이터 삽입
cursor.execute('''
INSERT INTO weather_records
(city, temperature, humidity, description, wind_speed, collection_time)
VALUES (?, ?, ?, ?, ?, ?)
''', (city, temp, humidity, weather_desc, wind_speed, collection_time))
conn.commit()
conn.close()
print(f"날씨 데이터 수집 완료: {city}, {temp}°C, {collection_time}")
return True
except Exception as e:
print(f"날씨 데이터 수집 실패: {e}")
return False
# 스케줄러에 등록
schedule.every().hour.do(collect_weather_data)
1.4 웹 스크래핑으로 데이터 수집
from bs4 import BeautifulSoup
def scrape_news_headlines():
"""뉴스 웹사이트에서 헤드라인을 스크래핑하여 저장"""
try:
# 웹 페이지 가져오기
url = "https://news.naver.com/"
headers = {'User-Agent': 'Mozilla/5.0'}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
# 헤드라인 추출 (웹사이트 구조에 따라 선택자 조정 필요)
headlines = soup.select('.headline_list a')
# 데이터베이스 연결
conn = sqlite3.connect('news_data.db')
cursor = conn.cursor()
# 테이블이 없으면 생성
cursor.execute('''
CREATE TABLE IF NOT EXISTS news_headlines (
id INTEGER PRIMARY KEY AUTOINCREMENT,
headline TEXT,
url TEXT,
collection_time TEXT
)
''')
# 데이터 삽입
collection_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for headline in headlines[:10]: # 상위 10개 헤드라인만 저장
title = headline.text.strip()
link = headline.get('href')
cursor.execute('''
INSERT INTO news_headlines (headline, url, collection_time)
VALUES (?, ?, ?)
''', (title, link, collection_time))
conn.commit()
conn.close()
print(f"뉴스 헤드라인 수집 완료: {collection_time}, {len(headlines)} 건")
return True
except Exception as e:
print(f"뉴스 헤드라인 수집 실패: {e}")
return False
# 매일 아침 9시에 실행
schedule.every().day.at("09:00").do(scrape_news_headlines)
1.5 실용적인 스케줄러 실행 방법
def run_scheduler():
"""스케줄러를 안정적으로 실행하는 함수"""
# 모든 작업 등록
schedule.every().hour.do(collect_weather_data)
schedule.every().day.at("09:00").do(scrape_news_headlines)
# 추가 작업들...
print("스케줄러가 시작되었습니다...")
try:
# 무한 루프로 실행
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
print("스케줄러가 중지되었습니다.")
except Exception as e:
print(f"오류 발생: {e}")
# 여기에 오류 보고 로직 추가 (이메일 전송 등)
# 스크립트 직접 실행 시에만 스케줄러 시작
if __name__ == "__main__":
run_scheduler()
자동 보고서 생성
수집된 데이터를 분석하고 보고서로 자동 생성하는 시스템을 구축해 보겠습니다.
2.1 데이터 분석 및 시각화
import matplotlib.pyplot as plt
import seaborn as sns
from fpdf import FPDF
import os
def generate_weather_report():
"""날씨 데이터를 분석하고 PDF 보고서 생성"""
# 데이터베이스에서 데이터 가져오기
conn = sqlite3.connect('weather_data.db')
# 최근 7일간의 날씨 데이터 조회
query = """
SELECT collection_time, temperature, humidity, wind_speed
FROM weather_records
WHERE city = 'Seoul'
AND collection_time >= date('now', '-7 days')
ORDER BY collection_time
"""
df = pd.read_sql_query(query, conn)
conn.close()
# 데이터 전처리
df['collection_time'] = pd.to_datetime(df['collection_time'])
df.set_index('collection_time', inplace=True)
# 일별 평균 계산
daily_avg = df.resample('D').mean()
# 그래프 생성
plt.figure(figsize=(12, 8))
# 온도 그래프
plt.subplot(2, 1, 1)
sns.lineplot(data=daily_avg, x=daily_avg.index.date, y='temperature')
plt.title('일평균 기온 (최근 7일)')
plt.ylabel('온도 (°C)')
# 습도 그래프
plt.subplot(2, 1, 2)
sns.lineplot(data=daily_avg, x=daily_avg.index.date, y='humidity')
plt.title('일평균 습도 (최근 7일)')
plt.ylabel('습도 (%)')
# 그래프 저장
report_date = datetime.now().strftime('%Y%m%d')
graph_file = f'weather_graph_{report_date}.png'
plt.tight_layout()
plt.savefig(graph_file)
plt.close()
# PDF 보고서 생성
pdf = FPDF()
pdf.add_page()
# 제목 추가
pdf.set_font('Arial', 'B', 16)
pdf.cell(190, 10, f'날씨 주간 보고서 ({report_date})', 0, 1, 'C')
# 그래프 추가
pdf.image(graph_file, x=10, y=30, w=190)
# 통계 정보 추가
pdf.set_font('Arial', '', 12)
pdf.ln(140) # 그래프 아래로 이동
stats_text = [
f"기간: {daily_avg.index.date.min()} ~ {daily_avg.index.date.max()}",
f"평균 기온: {daily_avg['temperature'].mean():.1f}°C",
f"최고 기온: {df['temperature'].max():.1f}°C",
f"최저 기온: {df['temperature'].min():.1f}°C",
f"평균 습도: {daily_avg['humidity'].mean():.1f}%",
f"평균 풍속: {daily_avg['wind_speed'].mean():.1f} m/s"
]
for text in stats_text:
pdf.cell(190, 10, text, 0, 1)
# PDF 저장
pdf_file = f'weather_report_{report_date}.pdf'
pdf.output(pdf_file)
# 임시 그래프 파일 삭제
os.remove(graph_file)
print(f"날씨 보고서가 생성되었습니다: {pdf_file}")
return pdf_file
# 매주 월요일 아침에 보고서 생성
schedule.every().monday.at("07:00").do(generate_weather_report)
2.2 복합 보고서 생성 (여러 데이터 소스 결합)
def generate_comprehensive_report():
"""날씨 데이터와 뉴스 데이터를 결합한 종합 보고서 생성"""
report_date = datetime.now().strftime('%Y%m%d')
# 1. 데이터베이스 연결
conn = sqlite3.connect('weather_data.db')
# 2. 날씨 데이터 쿼리
weather_df = pd.read_sql_query("""
SELECT collection_time, temperature, humidity, description
FROM weather_records
WHERE collection_time >= date('now', '-3 days')
ORDER BY collection_time
""", conn)
# 3. 뉴스 데이터 쿼리 (다른 DB에서)
news_conn = sqlite3.connect('news_data.db')
news_df = pd.read_sql_query("""
SELECT collection_time, headline
FROM news_headlines
WHERE collection_time >= date('now', '-3 days')
ORDER BY collection_time DESC
LIMIT 20
""", news_conn)
# 연결 닫기
conn.close()
news_conn.close()
# 4. 날씨 그래프 생성
plt.figure(figsize=(10, 6))
weather_df['collection_time'] = pd.to_datetime(weather_df['collection_time'])
plt.plot(weather_df['collection_time'], weather_df['temperature'], marker='o')
plt.title('최근 3일 기온 변화')
plt.xticks(rotation=45)
plt.tight_layout()
# 그래프 저장
weather_graph = f'temp_weather_{report_date}.png'
plt.savefig(weather_graph)
plt.close()
# 5. PDF 보고서 생성
pdf = FPDF()
# 첫 페이지 - 요약
pdf.add_page()
pdf.set_font('Arial', 'B', 16)
pdf.cell(190, 10, f'일일 종합 리포트 ({report_date})', 0, 1, 'C')
pdf.set_font('Arial', '', 12)
pdf.ln(10)
pdf.cell(190, 10, f'1. 날씨 요약', 0, 1)
pdf.cell(190, 10, f' - 평균 기온: {weather_df["temperature"].mean():.1f}°C', 0, 1)
pdf.cell(190, 10, f' - 최근 날씨 상태: {weather_df["description"].iloc[-1]}', 0, 1)
pdf.ln(10)
pdf.cell(190, 10, f'2. 주요 뉴스 헤드라인', 0, 1)
# 헤드라인 상위 5개만 표시
for i, row in news_df.head(5).iterrows():
headline = row['headline']
if len(headline) > 60:
headline = headline[:57] + '...'
pdf.cell(190, 10, f' - {headline}', 0, 1)
# 두 번째 페이지 - 날씨 그래프와 상세 데이터
pdf.add_page()
pdf.set_font('Arial', 'B', 14)
pdf.cell(190, 10, '날씨 상세 데이터', 0, 1, 'C')
# 그래프 추가
pdf.image(weather_graph, x=10, y=30, w=190)
# 세 번째 페이지 - 뉴스 전체 목록
pdf.add_page()
pdf.set_font('Arial', 'B', 14)
pdf.cell(190, 10, '최근 뉴스 헤드라인 (전체)', 0, 1, 'C')
pdf.set_font('Arial', '', 11)
pdf.ln(10)
for i, row in news_df.iterrows():
time_str = pd.to_datetime(row['collection_time']).strftime('%m-%d %H:%M')
headline = row['headline']
if len(headline) > 65:
headline = headline[:62] + '...'
pdf.cell(190, 8, f'{time_str} | {headline}', 0, 1)
# PDF 저장
report_file = f'daily_report_{report_date}.pdf'
pdf.output(report_file)
# 임시 파일 삭제
os.remove(weather_graph)
print(f"종합 보고서가 생성되었습니다: {report_file}")
return report_file
# 매일 오후 6시에 종합 보고서 생성
schedule.every().day.at("18:00").do(generate_comprehensive_report)
2.3 Excel 보고서 생성
def generate_excel_report():
"""수집된 데이터를 엑셀 보고서로 생성"""
report_date = datetime.now().strftime('%Y%m%d')
# DB에서 데이터 불러오기
conn = sqlite3.connect('weather_data.db')
weather_df = pd.read_sql_query("""
SELECT collection_time, city, temperature, humidity, description, wind_speed
FROM weather_records
WHERE collection_time >= date('now', '-30 days')
ORDER BY collection_time
""", conn)
conn.close()
# 시간 데이터 변환
weather_df['collection_time'] = pd.to_datetime(weather_df['collection_time'])
weather_df['date'] = weather_df['collection_time'].dt.date
weather_df['time'] = weather_df['collection_time'].dt.time
# 일별 집계 데이터 생성
daily_summary = weather_df.groupby('date').agg({
'temperature': ['mean', 'min', 'max'],
'humidity': ['mean', 'min', 'max'],
'wind_speed': ['mean', 'max']
})
# 엑셀 작성
excel_file = f'weather_report_{report_date}.xlsx'
with pd.ExcelWriter(excel_file, engine='xlsxwriter') as writer:
# 원본 데이터 시트
weather_df.to_excel(writer, sheet_name='Raw Data', index=False)
# 일별 요약 데이터 시트
daily_summary.to_excel(writer, sheet_name='Daily Summary')
# 차트 생성
workbook = writer.book
# 일별 평균 기온 차트
temp_chart_sheet = workbook.add_worksheet('Temperature Chart')
# 일별 데이터 별도 시트에 추가 (차트용)
chart_data = weather_df.groupby('date')['temperature'].mean().reset_index()
chart_data.to_excel(writer, sheet_name='Chart Data', index=False)
# 차트 생성
chart = workbook.add_chart({'type': 'line'})
chart.add_series({
'name': '일평균 기온',
'categories': '=Chart Data!$A$2:$A$32', # 날짜 범위
'values': '=Chart Data!$B$2:$B$32', # 온도 값 범위
})
chart.set_title({'name': '30일간 일평균 기온 추이'})
chart.set_x_axis({'name': '날짜'})
chart.set_y_axis({'name': '온도 (°C)'})
temp_chart_sheet.insert_chart('B2', chart, {'x_scale': 2, 'y_scale': 1})
print(f"엑셀 보고서가 생성되었습니다: {excel_file}")
return excel_file
# 매월 1일에 월간 보고서 생성
schedule.every().month.at("09:00").do(generate_excel_report)
알림 시스템 구축
자동화 시스템을 효과적으로 운영하기 위해 중요한 이벤트 발생 시 알림을 보내는 시스템을 구축해 보겠습니다.
3.1 이메일 알림 시스템
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
def send_email_alert(subject, body, recipients, attachment_path=None):
"""이메일 알림 전송 함수"""
# 이메일 설정 (실제 사용 시 환경변수나 설정 파일에서 로드)
sender_email = "your_email@gmail.com"
sender_password = "your_app_password" # Gmail은 앱 비밀번호 사용 필요
# 이메일 메시지 구성
msg = MIMEMultipart()
msg['From'] = sender_email
msg['To'] = ', '.join(recipients)
msg['Subject'] = subject
# 본문 추가
msg.attach(MIMEText(body, 'plain'))
# 첨부파일 추가 (있는 경우)
if attachment_path and os.path.exists(attachment_path):
with open(attachment_path, 'rb') as file:
attachment = MIMEApplication(file.read())
attachment.add_header(
'Content-Disposition',
'attachment',
filename=os.path.basename(attachment_path)
)
msg.attach(attachment)
try:
# SMTP 서버 연결 및 로그인
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls()
server.login(sender_email, sender_password)
# 이메일 전송
server.send_message(msg)
server.quit()
print(f"알림 이메일이 전송되었습니다: {subject}")
return True
except Exception as e:
print(f"이메일 전송 실패: {e}")
return False
# 보고서 생성 후 이메일 전송
def generate_and_send_report():
"""보고서 생성 후 이메일로 전송"""
try:
# 보고서 생성
report_file = generate_comprehensive_report()
# 이메일 전송
subject = f"일일 종합 보고서 ({datetime.now().strftime('%Y-%m-%d')})"
body = """
안녕하세요,
첨부된 파일은 오늘의 종합 보고서입니다.
- 날씨 데이터 요약
- 주요 뉴스 헤드라인
- 상세 분석 결과
문의사항이 있으시면 회신해 주세요.
감사합니다.
자동화 시스템
"""
recipients = ["recipient1@example.com", "recipient2@example.com"]
send_email_alert(subject, body, recipients, report_file)
return True
except Exception as e:
print(f"보고서 생성 및 전송 실패: {e}")
# 오류 알림 이메일 전송
error_subject = "보고서 생성 오류 발생"
error_body = f"보고서 생성 중 다음 오류가 발생했습니다:\n\n{str(e)}"
send_email_alert(error_subject, error_body, ["admin@example.com"])
return False
# 매일 오후 6시 30분에 보고서 생성 및 전송
schedule.every().day.at("18:30").do(generate_and_send_report)
3.2 특정 조건 기반 알림 시스템
def check_weather_alert():
"""특정 기상 조건 발생 시 알림 전송"""
conn = sqlite3.connect('weather_data.db')
cursor = conn.cursor()
# 최근 날씨 데이터 확인
cursor.execute("""
SELECT temperature, humidity, description
FROM weather_records
WHERE city = 'Seoul'
ORDER BY collection_time DESC
LIMIT 1
""")
result = cursor.fetchone()
conn.close()
if not result:
return False
temperature, humidity, description = result
alerts = []
# 알림 조건 확인
if temperature > 35:
alerts.append(f"폭염 주의: 현재 온도 {temperature}°C")
if temperature < 0:
alerts.append(f"영하 기온: 현재 온도 {temperature}°C")
# 특정 키워드 확인
weather_alerts = ["rain", "storm", "snow", "hurricane", "typhoon"]
for keyword in weather_alerts:
if keyword in description.lower():
alerts.append(f"기상 경보: {description}")
break
# 알림이 있는 경우 전송
if alerts:
alert_subject = "기상 특보 알림"
alert_body = "다음과 같은 주의보가 발령되었습니다:\n\n" + "\n".join(alerts)
# 관리자 및 사용자에게 알림
recipients = ["admin@example.com", "user@example.com"]
send_email_alert(alert_subject, alert_body, recipients)
print(f"기상 알림 전송됨: {', '.join(alerts)}")
return True
return False
# 매 시간마다 기상 알림 확인
schedule.every().hour.do(check_weather_alert)
3.3 시스템 상태 모니터링 및 알림
def check_system_health():
"""시스템 상태 확인 및 이상 시 알림"""
system_ok = True
errors = []
# 1. 데이터베이스 연결 확인
try:
conn = sqlite3.connect('weather_data.db')
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM weather_records")
count = cursor.fetchone()[0]
conn.close()
# 24시간 내 데이터가 없으면 경고
conn = sqlite3.connect('weather_data.db')
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*) FROM weather_records
WHERE collection_time >= datetime('now', '-24 hours')
""")
recent_count = cursor.fetchone()[0]
conn.close()
if recent_count == 0:
system_ok = False
errors.append("최근 24시간 내 수집된 날씨 데이터가 없습니다.")
except Exception as e:
system_ok = False
errors.append(f"데이터베이스 연결 오류: {e}")
# 2. 디스크 공간 확인
try:
total, used, free = shutil.disk_usage("/")
free_percent = (free / total) * 100
if free_percent < 10: # 10% 미만이면 경고
system_ok = False
errors.append(f"디스크 공간 부족: 남은 공간 {free_percent:.1f}%")
except Exception as e:
system_ok = False
errors.append(f"디스크 확인 오류: {e}")
# 3. 최근 스케줄 실행 확인
# (여기서는 예시로 파일을 통해 마지막 실행 시간을 기록/확인)
last_run_file = "last_run_timestamp.txt"
try:
current_time = datetime.now()
# 파일이 있으면 읽기
if os.path.exists(last_run_file):
with open(last_run_file, 'r') as f:
last_run_str = f.read().strip()
last_run = datetime.fromisoformat(last_run_str)
# 6시간 이상 실행되지 않았으면 경고
if (current_time - last_run).total_seconds() > 6 * 3600:
system_ok = False
errors.append(f"스케줄러 작업이 6시간 이상 실행되지 않았습니다. 마지막 실행: {last_run_str}")
else:
# 파일이 없으면 경고
system_ok = False
errors.append("스케줄러 작업 상태를 확인할 수 없습니다.")
# 현재 시간 기록
with open(last_run_file, 'w') as f:
f.write(current_time.isoformat())
except Exception as e:
system_ok = False
errors.append(f
'오픈소스를 위한 기초 상식' 카테고리의 다른 글
자동 리포트 생성 시스템 학습 자료 (0) | 2025.03.27 |
---|---|
데이터 수집-분석-저장 파이프라인 구축 가이드 (0) | 2025.03.26 |
SQLite 학습 가이드 (0) | 2025.03.24 |
실전 웹 스크래핑 프로젝트 학습자료 (0) | 2025.03.23 |
고급 웹 스크래핑 가이드 (0) | 2025.03.22 |