import os
import pandas as pd
from bson import ObjectId
from dotenv import load_dotenv
from pymongo import MongoClient
from src.utils.logger import setup_logger
logger = setup_logger()
# Load environment variables
load_dotenv()
[docs]
def get_mongo_client():
"""
Connects to MongoDB and returns the database object.
Uses environment variables for connection:
MONGO_USERNAME: username for MongoDB authentication
MONGO_PASSWORD: password for MongoDB authentication
MONGO_DB_NAME: name of the database to connect to
Returns:
pymongo.database.Database: the connected database object
Raises:
Exception: if connection fails
"""
try:
mongo_uri = f"mongodb+srv://{os.getenv('MONGO_USERNAME')}:{os.getenv('MONGO_PASSWORD')}@devasy23.a8hxla5.mongodb.net/?retryWrites=true&w=majority&appName=Devasy23"
db_name = os.getenv("DB_NAME")
client = MongoClient(
mongo_uri, socketTimeoutMS=60000, connectTimeoutMS=60000)
db = client[db_name]
logger.info("Successfully connected to MongoDB.")
return db
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
[docs]
def content_manager(article_id, required_fields):
"""
Checks if the specified fields are present in the database for the given article_id.
Args:
article_id (str): The ID of the article to check.
required_fields (list): A list of fields to check for presence (e.g., ["content", "summary", "keywords", "sentiment"]).
Returns:
dict: A dictionary with the status of each field (True if present, False if not).
"""
# Connect to the MongoDB
db = get_mongo_client()
collection = db["News_Articles"]
# Query the document by article_id
article = collection.find_one({"id": article_id})
# Check for the required fields
field_status = {
field: field in article and bool(article[field]) for field in required_fields
}
return field_status
[docs]
def insert_document(collection_name, document):
"""
Inserts a document into the given collection.
Args:
collection_name (str): The name of the collection.
document (dict): The document to be inserted.
Returns:
str: The ID of the inserted document.
Raises:
Exception: If there is an error inserting the document.
"""
db = get_mongo_client()
collection = db[collection_name]
try:
result = collection.insert_one(document)
logger.info(f"Document inserted with ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
logger.error(f"Failed to insert document: {e}")
raise
[docs]
def find_one_document(collection_name, query):
"""
Finds a single document in the given MongoDB collection using the given query.
Args:
collection_name (str): The name of the collection.
query (dict): The query to select documents.
Returns:
dict: The selected document.
Raises:
Exception: If there is an error finding the document.
"""
db = get_mongo_client()
collection = db[collection_name]
try:
result = collection.find_one(query)
return result
except Exception as e:
logger.error(f"Failed to find document: {e}")
raise
[docs]
def append_to_document(collection_name, query, update_data):
"""
Appends new data to an existing document in the MongoDB collection.
Args:
collection_name (str): The name of the MongoDB collection.
query (dict): The query to select the document to update.
update_data (dict): The new data to be appended to the document.
Returns:
int: The number of documents updated.
"""
db = get_mongo_client()
collection = db[collection_name]
try:
result = collection.update_one(query, {"$set": update_data})
if result.modified_count > 0:
logger.info(f"Document updated successfully.")
else:
logger.warning(
f"No document matched the query. No update performed.")
return result.modified_count
except Exception as e:
logger.error(f"Failed to update document: {e}")
raise
[docs]
def find_documents(collection_name, query):
"""
Finds documents in the given MongoDB collection using the given query.
Args:
collection_name (str): The name of the MongoDB collection.
query (dict): The query to select documents.
Returns:
list: A list of documents found by the query.
Raises:
Exception: If there is an error finding documents.
"""
db = get_mongo_client()
collection = db[collection_name]
try:
documents = collection.find(query)
return documents
except Exception as e:
logger.error(f"Failed to find documents: {e}")
raise
[docs]
def fetch_and_combine_articles(collection_name, article_ids):
"""
Fetches documents from the given MongoDB collection using the given IDs and combines them into a Pandas DataFrame.
Args:
collection_name (str): The name of the MongoDB collection.
article_ids (List[str]): List of IDs of the articles to fetch and combine.
Returns:
pd.DataFrame: A Pandas DataFrame containing the combined documents.
Raises:
Exception: If there is an error fetching and combining the documents.
"""
db = get_mongo_client()
collection = db[collection_name]
# Debug log to check what is being passed to the function
logger.debug(f"Received article_ids: {article_ids}")
try:
# Ensure article_ids is a list and not None
# Query MongoDB to find documents by their IDs
query = {"id": {"$in": article_ids}}
documents = collection.find(query)
logger.info(f"Fetched {documents} documents for the given IDs.")
# Prepare a list of documents
docs = []
for doc in documents:
doc["_id"] = str(
doc["_id"]
) # Convert ObjectId to string for easier handling
docs.append(doc)
# Convert the list of documents to a DataFrame
df = pd.DataFrame(docs)
print(df.drop(columns=["_id", "id"], inplace=True))
if df.empty:
logger.warning("No documents found for the provided article IDs.")
else:
logger.info("Successfully converted documents to DataFrame.")
logger.debug(df.columns)
return df
except Exception as e:
logger.error(f"Failed to fetch and combine articles: {e}")
raise