Step 1: Email Integration
In this lesson, you will connect to Gmail using OAuth 2.0, build a client that fetches emails with pagination, parse message headers and body content, handle threads, and store everything in the SQLite database. By the end, you will have a working email pipeline that can pull your latest messages on demand.
Gmail OAuth 2.0 Authentication
The Gmail API requires OAuth 2.0 for user authentication. Our auth module handles the entire flow: loading credentials, opening the browser for consent, and saving the refresh token for future use.
# app/gmail/auth.py
"""Gmail OAuth 2.0 authentication handler."""
import os
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from app.config import config
def get_gmail_credentials() -> Credentials:
"""
Get or refresh Gmail OAuth credentials.
On first run, opens a browser for user consent.
Subsequent runs use the saved refresh token.
Returns:
Valid Google OAuth2 Credentials object.
"""
creds = None
# Load existing token if available
if os.path.exists(config.gmail_token_path):
creds = Credentials.from_authorized_user_file(
config.gmail_token_path, config.gmail_scopes
)
# If no valid credentials, authenticate
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
print("Refreshing expired Gmail token...")
creds.refresh(Request())
else:
print("Starting Gmail OAuth flow...")
print("A browser window will open for authorization.")
flow = InstalledAppFlow.from_client_secrets_file(
config.gmail_credentials_path, config.gmail_scopes
)
creds = flow.run_local_server(port=0)
# Save the token for future runs
os.makedirs(os.path.dirname(config.gmail_token_path), exist_ok=True)
with open(config.gmail_token_path, "w") as token_file:
token_file.write(creds.to_json())
print(f"Token saved to {config.gmail_token_path}")
return creds
def get_gmail_service():
"""
Build and return an authenticated Gmail API service.
Returns:
googleapiclient.discovery.Resource for Gmail API v1.
"""
creds = get_gmail_credentials()
service = build("gmail", "v1", credentials=creds)
return service
token.json file is saved so you will not be prompted again until the token expires.Gmail API Client
Now build the client that fetches emails, handles pagination, and retrieves full message details:
# app/gmail/client.py
"""Gmail API client for fetching and sending emails."""
import base64
from email.mime.text import MIMEText
from typing import Optional
from app.gmail.auth import get_gmail_service
from app.config import config
class GmailClient:
"""Client for interacting with the Gmail API."""
def __init__(self):
self.service = get_gmail_service()
self.user_id = "me"
def fetch_messages(
self,
query: str = "is:inbox",
max_results: int = None,
page_token: str = None
) -> dict:
"""
Fetch message IDs matching a query.
Args:
query: Gmail search query (e.g., "is:inbox", "is:unread").
max_results: Maximum number of messages to return.
page_token: Token for pagination.
Returns:
Dict with 'messages' list and optional 'nextPageToken'.
"""
if max_results is None:
max_results = config.max_emails_per_fetch
params = {
"userId": self.user_id,
"q": query,
"maxResults": max_results,
}
if page_token:
params["pageToken"] = page_token
result = self.service.users().messages().list(**params).execute()
return result
def get_message(self, message_id: str, format: str = "full") -> dict:
"""
Get full message details by ID.
Args:
message_id: Gmail message ID.
format: Response format ('full', 'metadata', 'raw').
Returns:
Full message resource dict.
"""
message = (
self.service.users()
.messages()
.get(userId=self.user_id, id=message_id, format=format)
.execute()
)
return message
def get_thread(self, thread_id: str) -> dict:
"""
Get all messages in a thread.
Args:
thread_id: Gmail thread ID.
Returns:
Thread resource dict with all messages.
"""
thread = (
self.service.users()
.threads()
.get(userId=self.user_id, id=thread_id)
.execute()
)
return thread
def fetch_all_messages(
self, query: str = "is:inbox", max_total: int = None
) -> list:
"""
Fetch all messages matching a query with automatic pagination.
Args:
query: Gmail search query.
max_total: Maximum total messages to fetch.
Returns:
List of message resource dicts (IDs only).
"""
if max_total is None:
max_total = config.max_emails_per_fetch
all_messages = []
page_token = None
while len(all_messages) < max_total:
remaining = max_total - len(all_messages)
batch_size = min(remaining, 100) # Gmail API max per page
result = self.fetch_messages(
query=query,
max_results=batch_size,
page_token=page_token
)
messages = result.get("messages", [])
all_messages.extend(messages)
page_token = result.get("nextPageToken")
if not page_token:
break
print(f"Fetched {len(all_messages)} message IDs")
return all_messages[:max_total]
def send_message(
self,
to: str,
subject: str,
body: str,
thread_id: Optional[str] = None,
in_reply_to: Optional[str] = None
) -> dict:
"""
Send an email message.
Args:
to: Recipient email address.
subject: Email subject line.
body: Plain text email body.
thread_id: Gmail thread ID (for replies).
in_reply_to: Message-ID header (for threading).
Returns:
Sent message resource dict.
"""
message = MIMEText(body)
message["to"] = to
message["subject"] = subject
if in_reply_to:
message["In-Reply-To"] = in_reply_to
message["References"] = in_reply_to
raw = base64.urlsafe_b64encode(
message.as_bytes()
).decode("utf-8")
body_payload = {"raw": raw}
if thread_id:
body_payload["threadId"] = thread_id
sent = (
self.service.users()
.messages()
.send(userId=self.user_id, body=body_payload)
.execute()
)
print(f"Message sent. ID: {sent['id']}")
return sent
def modify_labels(
self,
message_id: str,
add_labels: list = None,
remove_labels: list = None
) -> dict:
"""
Add or remove labels from a message.
Args:
message_id: Gmail message ID.
add_labels: List of label IDs to add.
remove_labels: List of label IDs to remove.
Returns:
Modified message resource dict.
"""
body = {
"addLabelIds": add_labels or [],
"removeLabelIds": remove_labels or [],
}
result = (
self.service.users()
.messages()
.modify(userId=self.user_id, id=message_id, body=body)
.execute()
)
return result
Email Parser
Raw Gmail API responses are nested and base64-encoded. The parser extracts clean, usable data:
# app/gmail/parser.py
"""Parse Gmail API message responses into clean data."""
import base64
from datetime import datetime
from typing import Optional
from bs4 import BeautifulSoup
import html2text
def parse_message(message: dict) -> dict:
"""
Parse a full Gmail message into a clean dictionary.
Args:
message: Gmail API message resource (format='full').
Returns:
Dict with subject, sender, date, body_text, body_html, etc.
"""
headers = message.get("payload", {}).get("headers", [])
header_map = {h["name"].lower(): h["value"] for h in headers}
# Parse the date
date_str = header_map.get("date", "")
date = _parse_date(date_str)
# Extract sender name and email
sender_raw = header_map.get("from", "")
sender_name, sender_email = _parse_sender(sender_raw)
# Extract body content
body_text, body_html = _extract_body(message.get("payload", {}))
# Check for attachments
has_attachments = _has_attachments(message.get("payload", {}))
# Get labels
labels = message.get("labelIds", [])
return {
"gmail_id": message["id"],
"thread_id": message.get("threadId", ""),
"subject": header_map.get("subject", "(no subject)"),
"sender": sender_email,
"sender_name": sender_name,
"recipient": header_map.get("to", ""),
"date": date,
"snippet": message.get("snippet", ""),
"body_text": body_text,
"body_html": body_html,
"labels": ",".join(labels),
"is_read": "UNREAD" not in labels,
"has_attachments": has_attachments,
"message_id": header_map.get("message-id", ""),
}
def _parse_date(date_str: str) -> Optional[datetime]:
"""Parse an email date string into a datetime object."""
from email.utils import parsedate_to_datetime
try:
return parsedate_to_datetime(date_str)
except (ValueError, TypeError):
return None
def _parse_sender(sender_raw: str) -> tuple:
"""
Parse 'John Doe ' into (name, email).
Returns:
Tuple of (sender_name, sender_email).
"""
if "<" in sender_raw and ">" in sender_raw:
name = sender_raw.split("<")[0].strip().strip('"')
email = sender_raw.split("<")[1].split(">")[0].strip()
return name, email
return "", sender_raw.strip()
def _extract_body(payload: dict) -> tuple:
"""
Recursively extract text and HTML body from message payload.
Gmail messages can have nested MIME parts. This function
walks the tree to find text/plain and text/html parts.
Returns:
Tuple of (body_text, body_html).
"""
body_text = ""
body_html = ""
mime_type = payload.get("mimeType", "")
# Single-part message
if mime_type == "text/plain":
data = payload.get("body", {}).get("data", "")
if data:
body_text = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
elif mime_type == "text/html":
data = payload.get("body", {}).get("data", "")
if data:
body_html = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
# Multi-part message - recurse into parts
parts = payload.get("parts", [])
for part in parts:
part_text, part_html = _extract_body(part)
if part_text and not body_text:
body_text = part_text
if part_html and not body_html:
body_html = part_html
# If we only have HTML, convert to text
if body_html and not body_text:
converter = html2text.HTML2Text()
converter.ignore_links = False
converter.ignore_images = True
body_text = converter.handle(body_html)
return body_text, body_html
def _has_attachments(payload: dict) -> bool:
"""Check if the message has file attachments."""
parts = payload.get("parts", [])
for part in parts:
if part.get("filename"):
return True
# Check nested parts
if _has_attachments(part):
return True
return False
def get_thread_context(messages: list, current_id: str) -> str:
"""
Build conversation context from a thread for LLM input.
Args:
messages: List of parsed message dicts in the thread.
current_id: Gmail ID of the current message.
Returns:
Formatted thread context string.
"""
context_parts = []
for msg in messages:
if msg["gmail_id"] == current_id:
continue # Skip the current message
sender = msg.get("sender_name") or msg.get("sender", "Unknown")
date = msg.get("date", "")
if isinstance(date, datetime):
date = date.strftime("%Y-%m-%d %H:%M")
body = msg.get("body_text", "")[:500] # Truncate long bodies
context_parts.append(
f"From: {sender} ({date})\n{body}"
)
return "\n---\n".join(context_parts)
Storing Emails in the Database
Now tie everything together with a function that fetches emails and stores them:
# app/gmail/__init__.py
"""Gmail integration - fetch, parse, and store emails."""
from app.gmail.client import GmailClient
from app.gmail.parser import parse_message
from app.database import SessionLocal, Email
def fetch_and_store_emails(
query: str = "is:inbox newer_than:1d",
max_results: int = 50
) -> list:
"""
Fetch emails from Gmail and store them in the database.
Args:
query: Gmail search query.
max_results: Maximum emails to fetch.
Returns:
List of newly stored Email objects.
"""
client = GmailClient()
session = SessionLocal()
new_emails = []
try:
# Fetch message IDs
message_ids = client.fetch_all_messages(
query=query, max_total=max_results
)
for msg_ref in message_ids:
msg_id = msg_ref["id"]
# Skip if already in database
existing = (
session.query(Email)
.filter(Email.gmail_id == msg_id)
.first()
)
if existing:
continue
# Fetch full message and parse
raw_message = client.get_message(msg_id)
parsed = parse_message(raw_message)
# Create database record
email = Email(
gmail_id=parsed["gmail_id"],
thread_id=parsed["thread_id"],
subject=parsed["subject"],
sender=parsed["sender"],
sender_name=parsed["sender_name"],
recipient=parsed["recipient"],
date=parsed["date"],
snippet=parsed["snippet"],
body_text=parsed["body_text"],
body_html=parsed["body_html"],
labels=parsed["labels"],
is_read=parsed["is_read"],
has_attachments=parsed["has_attachments"],
)
session.add(email)
new_emails.append(email)
session.commit()
print(f"Stored {len(new_emails)} new emails")
except Exception as e:
session.rollback()
print(f"Error fetching emails: {e}")
raise
finally:
session.close()
return new_emails
Test the Email Integration
# Test fetching emails (run after OAuth setup)
python -c "
from app.gmail import fetch_and_store_emails
emails = fetch_and_store_emails(
query='is:inbox newer_than:1d',
max_results=10
)
for e in emails:
print(f' [{e.sender}] {e.subject}')
print(f'Total: {len(emails)} new emails stored')
"
Key Takeaways
- OAuth 2.0 handles authentication securely. The refresh token is saved so you only authorize once.
- The Gmail API returns nested, base64-encoded payloads. Our parser recursively extracts text and HTML bodies from multipart messages.
- Pagination with
nextPageTokenensures we can fetch large batches of emails without hitting API limits. - Deduplication by
gmail_idprevents storing the same email twice on repeated fetches. - The
send_messagemethod supports threading by passingthread_idandIn-Reply-Toheaders.
What Is Next
In the next lesson, you will build the email classification engine — using an LLM to detect priority levels, assign category tags, and analyze sentiment for every incoming email.
Lilly Tech Systems