Monitoring and analysis system for Chilean news media. Processes news in real-time, generates semantic embeddings, and enables intelligent content search.
Motivation
Researchers, journalists, and analysts need to monitor multiple news outlets daily. The manual process is: - Tedious: reviewing 20+ sites every day - Incomplete: easy to miss relevant news - Without context: difficult to connect related topics
Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ OpenMedia Pipeline │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ │
│ │ Crawlers │───▶│ Kafka │───▶│ Processor│───▶│ PostgreSQL │ │
│ │ (async) │ │ Topics │ │ Workers │ │ + pgvector │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────────┐ │
│ │ Redis │ │ Search API │ │
│ │ Cache │ │ (FastAPI) │ │
│ └──────────┘ └──────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
Components
Async Crawlers
Specialized scrapers for each media outlet:
class BaseCrawler:
"""Base for all media crawlers."""
async def fetch_articles(self) -> List[RawArticle]:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch_page(session, url) for url in self.feed_urls]
pages = await asyncio.gather(*tasks)
articles = []
for page in pages:
articles.extend(self.parse_page(page))
return articles
async def fetch_page(self, session, url: str) -> str:
async with session.get(url, headers=self.headers) as response:
return await response.text()
@abstractmethod
def parse_page(self, html: str) -> List[RawArticle]:
"""Implemented by each specific media."""
pass
class EmolCrawler(BaseCrawler):
feed_urls = [
'https://www.emol.com/noticias/Nacional/',
'https://www.emol.com/noticias/Economia/',
]
def parse_page(self, html: str) -> List[RawArticle]:
soup = BeautifulSoup(html, 'lxml')
articles = []
for card in soup.select('.col_center_noticia'):
articles.append(RawArticle(
url=card.select_one('a')['href'],
title=card.select_one('h1').text.strip(),
source='emol',
scraped_at=datetime.now()
))
return articles
Kafka Pipeline
Real-time event processing:
# Producer: Crawlers send raw articles
async def produce_articles(articles: List[RawArticle]):
producer = AIOKafkaProducer(bootstrap_servers='kafka:9092')
await producer.start()
for article in articles:
await producer.send(
'raw-articles',
value=article.json().encode(),
key=article.url.encode()
)
await producer.stop()
# Consumer: Workers process and enrich
async def consume_and_process():
consumer = AIOKafkaConsumer(
'raw-articles',
bootstrap_servers='kafka:9092',
group_id='article-processors'
)
async for msg in consumer:
article = RawArticle.parse_raw(msg.value)
# Extract full content
full_content = await fetch_full_article(article.url)
# Generate embedding
embedding = generate_embedding(full_content.text)
# Save in PostgreSQL + pgvector
await save_article_with_embedding(
article=full_content,
embedding=embedding
)
Embeddings with Transformers
Using multilingual models for semantic representation:
from sentence_transformers import SentenceTransformer
# Model optimized for Spanish
model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
def generate_embedding(text: str) -> List[float]:
"""Generates 384-dimension embedding."""
# Truncate to 512 tokens max
text = text[:2000]
# Generate embedding
embedding = model.encode(text, normalize_embeddings=True)
return embedding.tolist()
Semantic Search with pgvector
-- pgvector extension in PostgreSQL
CREATE EXTENSION vector;
-- Articles table with embedding
CREATE TABLE articles (
id SERIAL PRIMARY KEY,
url TEXT UNIQUE,
title TEXT,
content TEXT,
source VARCHAR(50),
published_at TIMESTAMP,
embedding vector(384)
);
-- Index for fast search
CREATE INDEX ON articles
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
# Semantic search
async def semantic_search(query: str, limit: int = 10) -> List[Article]:
query_embedding = generate_embedding(query)
results = await db.fetch_all("""
SELECT *, 1 - (embedding <=> :query_embedding) as similarity
FROM articles
ORDER BY embedding <=> :query_embedding
LIMIT :limit
""", {
'query_embedding': str(query_embedding),
'limit': limit
})
return [Article(**r) for r in results]
Search API
@app.get("/search")
async def search(
q: str,
source: Optional[str] = None,
date_from: Optional[date] = None,
limit: int = 20
):
# Semantic search
results = await semantic_search(q, limit=limit * 2)
# Additional filters
if source:
results = [r for r in results if r.source == source]
if date_from:
results = [r for r in results if r.published_at >= date_from]
return results[:limit]
@app.get("/related/{article_id}")
async def get_related(article_id: int, limit: int = 5):
"""Find semantically similar articles."""
article = await get_article(article_id)
return await db.fetch_all("""
SELECT *, 1 - (embedding <=> :embedding) as similarity
FROM articles
WHERE id != :article_id
ORDER BY embedding <=> :embedding
LIMIT :limit
""", {
'embedding': str(article.embedding),
'article_id': article_id,
'limit': limit
})
Results
| Metric | Value |
|---|---|
| Media outlets monitored | 15+ |
| Articles processed/day | ~2,000 |
| Indexing latency | < 5 min |
| Search precision | ~85% |
Use Cases
- Topic monitoring: Alerts when news about specific topics appears
- Coverage analysis: How different media cover the same event
- Research: Search historical articles by semantic similarity
- Trend detection: Identify emerging topics
Key Technologies
- aiohttp: High-performance async crawling
- Kafka: Message queue for distributed processing
- pgvector: Vector search in PostgreSQL
- Sentence Transformers: Multilingual embeddings
- Docker Compose: Service orchestration