first commit

This commit is contained in:
salvacybersec
2025-11-13 03:25:21 +03:00
commit abe170a1f8
21 changed files with 2198 additions and 0 deletions

2
src/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# YouTube Transcript RSS Feed Generator

174
src/database.py Normal file
View File

@@ -0,0 +1,174 @@
"""
SQLite veritabanı yönetimi modülü
"""
import sqlite3
import os
from datetime import datetime, timezone
from typing import Optional, List, Dict
class Database:
"""SQLite veritabanı yönetim sınıfı"""
def __init__(self, db_path: str = "data/videos.db"):
self.db_path = db_path
self.conn = None
def connect(self):
"""Veritabanı bağlantısı oluştur"""
# Dizin yoksa oluştur
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
return self.conn
def init_database(self):
"""Veritabanı şemasını oluştur"""
conn = self.connect()
cursor = conn.cursor()
# Channels tablosu
cursor.execute("""
CREATE TABLE IF NOT EXISTS channels (
channel_id TEXT PRIMARY KEY,
channel_name TEXT,
channel_url TEXT,
last_checked_utc TEXT,
created_at_utc TEXT DEFAULT (datetime('now'))
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_channels_last_checked
ON channels(last_checked_utc)
""")
# Videos tablosu
cursor.execute("""
CREATE TABLE IF NOT EXISTS videos (
video_id TEXT PRIMARY KEY,
channel_id TEXT,
video_title TEXT,
video_url TEXT,
published_at_utc TEXT,
processed_at_utc TEXT,
transcript_status INTEGER DEFAULT 0,
transcript_language TEXT,
transcript_raw TEXT,
transcript_clean TEXT,
last_updated_utc TEXT DEFAULT (datetime('now'))
)
""")
# Index'ler
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_videos_channel_id
ON videos(channel_id)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_videos_published_at_utc
ON videos(published_at_utc)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_videos_transcript_status
ON videos(transcript_status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_videos_processed_at_utc
ON videos(processed_at_utc)
""")
conn.commit()
print("Database initialized successfully")
def close(self):
"""Veritabanı bağlantısını kapat"""
if self.conn:
self.conn.close()
def is_video_processed(self, video_id: str) -> bool:
"""Video işlenmiş mi kontrol et"""
cursor = self.conn.cursor()
cursor.execute("SELECT video_id FROM videos WHERE video_id = ?", (video_id,))
return cursor.fetchone() is not None
def get_pending_videos(self) -> List[Dict]:
"""İşlenmeyi bekleyen videoları getir (status=0)"""
cursor = self.conn.cursor()
cursor.execute("""
SELECT * FROM videos
WHERE transcript_status = 0
ORDER BY published_at_utc DESC
""")
return [dict(row) for row in cursor.fetchall()]
def add_video(self, video_data: Dict):
"""Yeni video ekle (status=0 olarak)"""
cursor = self.conn.cursor()
cursor.execute("""
INSERT OR IGNORE INTO videos
(video_id, channel_id, video_title, video_url, published_at_utc, transcript_status)
VALUES (?, ?, ?, ?, ?, 0)
""", (
video_data['video_id'],
video_data.get('channel_id'),
video_data.get('video_title'),
video_data.get('video_url'),
video_data.get('published_at_utc')
))
self.conn.commit()
def update_video_transcript(self, video_id: str, raw: str, clean: str,
status: int, language: Optional[str] = None):
"""Video transcript'ini güncelle"""
cursor = self.conn.cursor()
now_utc = datetime.now(timezone.utc).isoformat()
cursor.execute("""
UPDATE videos
SET transcript_raw = ?,
transcript_clean = ?,
transcript_status = ?,
transcript_language = ?,
processed_at_utc = ?,
last_updated_utc = ?
WHERE video_id = ?
""", (raw, clean, status, language, now_utc, now_utc, video_id))
self.conn.commit()
def get_processed_videos(self, limit: Optional[int] = None,
channel_id: Optional[str] = None) -> List[Dict]:
"""İşlenmiş videoları getir (status=1)"""
cursor = self.conn.cursor()
query = """
SELECT * FROM videos
WHERE transcript_status = 1
"""
params = []
if channel_id:
query += " AND channel_id = ?"
params.append(channel_id)
query += " ORDER BY published_at_utc DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
cursor.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def mark_video_failed(self, video_id: str, reason: Optional[str] = None):
"""Video'yu başarısız olarak işaretle (status=2)"""
cursor = self.conn.cursor()
cursor.execute("""
UPDATE videos
SET transcript_status = 2,
last_updated_utc = ?
WHERE video_id = ?
""", (datetime.now(timezone.utc).isoformat(), video_id))
self.conn.commit()

78
src/rss_generator.py Normal file
View File

@@ -0,0 +1,78 @@
"""
RSS feed oluşturma modülü
"""
from feedgen.feed import FeedGenerator
from datetime import datetime
from typing import List, Dict
import pytz
class RSSGenerator:
"""RSS feed generator sınıfı"""
def __init__(self, channel_info: Dict):
"""
Args:
channel_info: Kanal bilgileri (title, link, description, language)
"""
self.fg = FeedGenerator()
self.channel_info = channel_info
self._setup_channel()
def _setup_channel(self):
"""Channel metadata ayarla"""
self.fg.id(self.channel_info.get('id', ''))
self.fg.title(self.channel_info.get('title', ''))
self.fg.link(href=self.channel_info.get('link', ''))
self.fg.description(self.channel_info.get('description', ''))
self.fg.language(self.channel_info.get('language', 'en'))
self.fg.lastBuildDate(datetime.now(pytz.UTC))
def add_video_entry(self, video: Dict):
"""
Video entry ekle
Args:
video: Video metadata dict
"""
fe = self.fg.add_entry()
# GUID (video ID)
fe.id(video['video_id'])
# Title
fe.title(video.get('video_title', ''))
# Link
fe.link(href=video.get('video_url', ''))
# Published date (timezone-aware)
if video.get('published_at_utc'):
try:
pub_date = datetime.fromisoformat(
video['published_at_utc'].replace('Z', '+00:00')
)
fe.published(pub_date)
except:
pass
# Description (kısa özet)
fe.description(video.get('description', '')[:200])
# Content (tam transcript)
if video.get('transcript_clean'):
fe.content(content=video['transcript_clean'])
def generate_rss(self, output_path: str):
"""RSS feed'i dosyaya yaz"""
self.fg.rss_file(output_path, pretty=True, extensions=True)
print(f"RSS feed generated: {output_path}")
def generate_rss_string(self) -> str:
"""RSS feed'i string olarak döndür"""
return self.fg.rss_str(pretty=True, extensions=True)
def generate_atom_string(self) -> str:
"""Atom feed'i string olarak döndür"""
return self.fg.atom_str(pretty=True)

136
src/transcript_cleaner.py Normal file
View File

@@ -0,0 +1,136 @@
"""
Transcript temizleme ve NLP işleme modülü
"""
import re
import html
import spacy
from typing import List, Dict
class TranscriptCleaner:
"""Transcript temizleme ve SBD sınıfı"""
def __init__(self, model_name: str = "en_core_web_sm"):
"""
Args:
model_name: SpaCy model adı
"""
try:
self.nlp = spacy.load(model_name)
except OSError:
print(f"Model {model_name} not found. Loading default...")
self.nlp = spacy.load("en_core_web_sm")
def remove_artifacts(self, text: str) -> str:
"""Artifact'ları kaldır"""
# Zaman kodlarını kaldır [00:00:00]
text = re.sub(r'\[\d{2}:\d{2}:\d{2}\]', '', text)
# Konuşma dışı etiketleri kaldır
text = re.sub(r'\[(Music|Applause|Laughter|Music playing)\]', '', text, flags=re.IGNORECASE)
# Aşırı boşlukları temizle
text = re.sub(r'\s+', ' ', text)
return text.strip()
def detect_sentence_boundaries(self, fragments: List[Dict]) -> List[str]:
"""
Fragment'ları birleştir ve cümle sınırlarını tespit et
Args:
fragments: Transcript fragment listesi [{"text": "...", "start": 0.0, ...}]
Returns:
Cümle listesi
"""
# Fragment'ları birleştir
full_text = ' '.join([f['text'] for f in fragments])
# Artifact'ları kaldır
full_text = self.remove_artifacts(full_text)
# SpaCy ile işle
doc = self.nlp(full_text)
# Cümleleri çıkar
sentences = [sent.text.strip() for sent in doc.sents if sent.text.strip()]
return sentences
def create_paragraphs(self, sentences: List[str],
sentences_per_paragraph: int = 3) -> List[str]:
"""
Cümleleri paragraflara böl
Args:
sentences: Cümle listesi
sentences_per_paragraph: Paragraf başına cümle sayısı
Returns:
Paragraf listesi
"""
paragraphs = []
current_paragraph = []
for sentence in sentences:
current_paragraph.append(sentence)
if len(current_paragraph) >= sentences_per_paragraph:
paragraphs.append(' '.join(current_paragraph))
current_paragraph = []
# Kalan cümleleri ekle
if current_paragraph:
paragraphs.append(' '.join(current_paragraph))
return paragraphs
def wrap_html(self, paragraphs: List[str]) -> str:
"""Paragrafları HTML'e sar"""
html_paragraphs = [f"<p>{p}</p>" for p in paragraphs]
return '\n'.join(html_paragraphs)
def escape_xml_entities(self, text: str) -> str:
"""XML entity escaping (kritik!)"""
# Önce & karakterlerini escape et (diğerlerinden önce!)
text = text.replace('&', '&amp;')
text = text.replace('<', '&lt;')
text = text.replace('>', '&gt;')
text = text.replace('"', '&quot;')
text = text.replace("'", '&apos;')
# &amp;'yi tekrar düzelt (zaten escape edilmiş olanlar için)
text = text.replace('&amp;amp;', '&amp;')
text = text.replace('&amp;lt;', '&lt;')
text = text.replace('&amp;gt;', '&gt;')
text = text.replace('&amp;quot;', '&quot;')
text = text.replace('&amp;apos;', '&apos;')
return text
def clean_transcript(self, fragments: List[Dict],
sentences_per_paragraph: int = 3) -> tuple[str, str]:
"""
Tam transcript temizleme pipeline'ı
Returns:
(raw_text, clean_html) tuple
"""
# Raw text (sadece birleştirilmiş)
raw_text = ' '.join([f['text'] for f in fragments])
# SBD ile cümleleri çıkar
sentences = self.detect_sentence_boundaries(fragments)
# Paragraflara böl
paragraphs = self.create_paragraphs(sentences, sentences_per_paragraph)
# HTML'e sar
html_content = self.wrap_html(paragraphs)
# XML entity escaping
clean_html = self.escape_xml_entities(html_content)
return raw_text, clean_html

View File

@@ -0,0 +1,69 @@
"""
YouTube transcript çıkarımı modülü
"""
from youtube_transcript_api import YouTubeTranscriptApi
from typing import List, Dict, Optional
import time
class TranscriptExtractor:
"""YouTube transcript çıkarıcı sınıfı"""
def __init__(self, rate_limit: int = 5, time_window: int = 10):
"""
Args:
rate_limit: Zaman penceresi başına maksimum istek sayısı
time_window: Zaman penceresi (saniye)
"""
self.rate_limit = rate_limit
self.time_window = time_window
self.request_times = []
def _check_rate_limit(self):
"""Rate limiting kontrolü (basit implementasyon)"""
now = time.time()
# Son time_window saniyesindeki istekleri filtrele
self.request_times = [t for t in self.request_times if now - t < self.time_window]
# Rate limit aşıldıysa bekle
if len(self.request_times) >= self.rate_limit:
sleep_time = self.time_window - (now - self.request_times[0])
if sleep_time > 0:
time.sleep(sleep_time)
# Tekrar filtrele
now = time.time()
self.request_times = [t for t in self.request_times if now - t < self.time_window]
# İstek zamanını kaydet
self.request_times.append(time.time())
def fetch_transcript(self, video_id: str,
languages: List[str] = ['en']) -> Optional[List[Dict]]:
"""
Transcript çıkar (sync)
Args:
video_id: YouTube video ID
languages: Öncelik sırasına göre dil listesi
Returns:
Transcript listesi veya None
"""
# Rate limiting kontrolü
self._check_rate_limit()
try:
# YouTube Transcript API kullanımı (yeni versiyon)
# API instance oluştur ve fetch() metodunu kullan
api = YouTubeTranscriptApi()
fetched_transcript = api.fetch(video_id, languages=languages)
# Eski formatı döndürmek için to_raw_data() kullan
# Format: [{'text': '...', 'start': 1.36, 'duration': 1.68}, ...]
transcript = fetched_transcript.to_raw_data()
return transcript
except Exception as e:
print(f"Error fetching transcript for {video_id}: {e}")
return None

105
src/video_fetcher.py Normal file
View File

@@ -0,0 +1,105 @@
"""
RSS-Bridge kullanarak video metadata çıkarımı
"""
import feedparser
import re
import requests
from urllib.parse import urlencode
from typing import List, Dict, Optional
from datetime import datetime
def get_channel_id_from_handle(handle_url: str) -> Optional[str]:
"""
Channel handle URL'inden Channel ID'yi web scraping ile bulur.
Örnek: https://www.youtube.com/@tavakfi -> UC...
"""
try:
response = requests.get(handle_url)
response.raise_for_status()
html_content = response.text
# İlk pattern: "externalId":"UC..."
match = re.search(r'"externalId":"(UC[a-zA-Z0-9_-]{22})"', html_content)
if match:
return match.group(1)
# Alternatif pattern: "channelId":"UC..."
match_alt = re.search(r'"channelId":"(UC[a-zA-Z0-9_-]{22})"', html_content)
if match_alt:
return match_alt.group(1)
return None
except requests.exceptions.RequestException as e:
raise Exception(f"Error fetching channel page: {e}")
def extract_video_id(url: str) -> Optional[str]:
"""YouTube URL'den video ID çıkar"""
patterns = [
r'youtube\.com/watch\?v=([a-zA-Z0-9_-]{11})',
r'youtu\.be/([a-zA-Z0-9_-]{11})',
r'youtube\.com/embed/([a-zA-Z0-9_-]{11})'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def fetch_videos_from_rss_bridge(base_url: str, channel_id: str,
format: str = "Atom", max_items: int = 100) -> List[Dict]:
"""
RSS-Bridge'den video listesini çek
Args:
base_url: RSS-Bridge base URL
channel_id: YouTube Channel ID (UC...)
format: Feed format (Atom veya Rss)
max_items: Maksimum video sayısı
Returns:
Video metadata listesi
"""
params = {
'action': 'display',
'bridge': 'YoutubeBridge',
'context': 'By channel id',
'c': channel_id,
'format': format
}
feed_url = f"{base_url}/?{urlencode(params)}"
try:
feed = feedparser.parse(feed_url)
videos = []
for entry in feed.entries[:max_items]:
video_id = extract_video_id(entry.link)
if not video_id:
continue
# Tarih parsing
published_date = None
if hasattr(entry, 'published_parsed') and entry.published_parsed:
published_date = datetime(*entry.published_parsed[:6]).isoformat() + 'Z'
videos.append({
'video_id': video_id,
'video_title': entry.title,
'video_url': entry.link,
'published_at_utc': published_date,
'description': getattr(entry, 'summary', '')
})
return videos
except Exception as e:
raise Exception(f"Error fetching RSS-Bridge feed: {e}")

286
src/web_server.py Normal file
View File

@@ -0,0 +1,286 @@
"""
Flask web server - RSS-Bridge benzeri URL template sistemi
"""
from flask import Flask, request, Response, jsonify
from typing import Optional
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database import Database
from src.video_fetcher import fetch_videos_from_rss_bridge, get_channel_id_from_handle, extract_video_id
from src.transcript_extractor import TranscriptExtractor
from src.transcript_cleaner import TranscriptCleaner
from src.rss_generator import RSSGenerator
app = Flask(__name__)
# Global instances (lazy loading)
db = None
extractor = None
cleaner = None
def get_db():
"""Database instance'ı al (singleton)"""
global db
if db is None:
db = Database()
db.init_database()
return db
def get_extractor():
"""Transcript extractor instance'ı al"""
global extractor
if extractor is None:
extractor = TranscriptExtractor()
return extractor
def get_cleaner():
"""Transcript cleaner instance'ı al"""
global cleaner
if cleaner is None:
cleaner = TranscriptCleaner()
return cleaner
def normalize_channel_id(channel_id: Optional[str] = None,
channel: Optional[str] = None,
channel_url: Optional[str] = None) -> Optional[str]:
"""
Farklı formatlardan channel ID'yi normalize et
Args:
channel_id: Direkt Channel ID (UC...)
channel: Channel handle (@username) veya username
channel_url: Full YouTube channel URL
Returns:
Normalize edilmiş Channel ID veya None
"""
# Direkt Channel ID varsa
if channel_id:
if channel_id.startswith('UC') and len(channel_id) == 24:
return channel_id
# Eğer URL formatında ise parse et
if 'youtube.com/channel/' in channel_id:
parts = channel_id.split('/channel/')
if len(parts) > 1:
return parts[-1].split('?')[0].split('/')[0]
# Channel handle (@username)
if channel:
if not channel.startswith('@'):
channel = f"@{channel}"
handle_url = f"https://www.youtube.com/{channel}"
return get_channel_id_from_handle(handle_url)
# Channel URL
if channel_url:
# Handle URL
if '/@' in channel_url:
return get_channel_id_from_handle(channel_url)
# Channel ID URL
elif '/channel/' in channel_url:
parts = channel_url.split('/channel/')
if len(parts) > 1:
return parts[-1].split('?')[0].split('/')[0]
return None
def process_channel(channel_id: str, max_items: int = 50) -> dict:
"""
Kanal için transcript feed'i oluştur
Returns:
RSS feed string ve metadata
"""
db = get_db()
extractor = get_extractor()
cleaner = get_cleaner()
# RSS-Bridge'den videoları çek
try:
videos = fetch_videos_from_rss_bridge(
base_url="https://rss-bridge.org/bridge01",
channel_id=channel_id,
format="Atom",
max_items=max_items
)
except Exception as e:
raise Exception(f"RSS-Bridge hatası: {e}")
# Yeni videoları veritabanına ekle
for video in videos:
video['channel_id'] = channel_id
if not db.is_video_processed(video['video_id']):
db.add_video(video)
# Bekleyen videoları işle (ilk 20)
pending_videos = db.get_pending_videos()[:20]
for video in pending_videos:
if video['channel_id'] != channel_id:
continue
try:
# Transcript çıkar
transcript = extractor.fetch_transcript(
video['video_id'],
languages=['tr', 'en']
)
if transcript:
# Transcript temizle
raw, clean = cleaner.clean_transcript(transcript, sentences_per_paragraph=3)
# Veritabanına kaydet
db.update_video_transcript(
video['video_id'],
raw,
clean,
status=1,
language='tr'
)
except Exception as e:
print(f"Transcript çıkarım hatası {video['video_id']}: {e}")
db.mark_video_failed(video['video_id'], str(e))
# İşlenmiş videoları getir
processed_videos = db.get_processed_videos(
limit=max_items,
channel_id=channel_id
)
return {
'videos': processed_videos,
'channel_id': channel_id,
'count': len(processed_videos)
}
@app.route('/', methods=['GET'])
def generate_feed():
"""
RSS-Bridge benzeri URL template:
Örnekler:
- /?channel_id=UC9h8BDcXwkhZtnqoQJ7PggA&format=Atom
- /?channel=@tavakfi&format=Atom
- /?channel_url=https://www.youtube.com/@tavakfi&format=Atom
"""
# Query parametrelerini al
channel_id = request.args.get('channel_id')
channel = request.args.get('channel') # @username veya username
channel_url = request.args.get('channel_url')
format_type = request.args.get('format', 'Atom').lower() # Atom veya Rss
max_items = int(request.args.get('max_items', 50))
# Channel ID'yi normalize et
normalized_channel_id = normalize_channel_id(
channel_id=channel_id,
channel=channel,
channel_url=channel_url
)
if not normalized_channel_id:
return jsonify({
'error': 'Channel ID bulunamadı',
'usage': {
'channel_id': 'UC... (YouTube Channel ID)',
'channel': '@username veya username',
'channel_url': 'https://www.youtube.com/@username veya https://www.youtube.com/channel/UC...',
'format': 'Atom veya Rss (varsayılan: Atom)',
'max_items': 'Maksimum video sayısı (varsayılan: 50)'
}
}), 400
try:
# Kanalı işle
result = process_channel(normalized_channel_id, max_items=max_items)
if not result['videos']:
return jsonify({
'error': 'Henüz işlenmiş video yok',
'channel_id': normalized_channel_id,
'message': 'Lütfen birkaç dakika sonra tekrar deneyin'
}), 404
# RSS feed oluştur
channel_info = {
'id': normalized_channel_id,
'title': f"YouTube Transcript Feed - {normalized_channel_id}",
'link': f"https://www.youtube.com/channel/{normalized_channel_id}",
'description': f'Full-text transcript RSS feed for channel {normalized_channel_id}',
'language': 'en'
}
generator = RSSGenerator(channel_info)
for video in result['videos']:
generator.add_video_entry(video)
# Format'a göre döndür
if format_type == 'rss':
rss_content = generator.generate_rss_string()
return Response(
rss_content,
mimetype='application/rss+xml',
headers={'Content-Type': 'application/rss+xml; charset=utf-8'}
)
else: # Atom
# Feedgen Atom desteği
atom_content = generator.generate_atom_string()
return Response(
atom_content,
mimetype='application/atom+xml',
headers={'Content-Type': 'application/atom+xml; charset=utf-8'}
)
except Exception as e:
return jsonify({
'error': str(e),
'channel_id': normalized_channel_id
}), 500
@app.route('/health', methods=['GET'])
def health():
"""Health check endpoint"""
return jsonify({'status': 'ok', 'service': 'YouTube Transcript RSS Feed'})
@app.route('/info', methods=['GET'])
def info():
"""API bilgileri"""
return jsonify({
'service': 'YouTube Transcript RSS Feed Generator',
'version': '1.0.0',
'endpoints': {
'/': 'RSS Feed Generator',
'/health': 'Health Check',
'/info': 'API Info'
},
'usage': {
'channel_id': 'UC... (YouTube Channel ID)',
'channel': '@username veya username',
'channel_url': 'Full YouTube channel URL',
'format': 'Atom veya Rss (varsayılan: Atom)',
'max_items': 'Maksimum video sayısı (varsayılan: 50)'
},
'examples': [
'/?channel_id=UC9h8BDcXwkhZtnqoQJ7PggA&format=Atom',
'/?channel=@tavakfi&format=Rss',
'/?channel_url=https://www.youtube.com/@tavakfi&format=Atom&max_items=100'
]
})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)