Intermediate

Step 1: Email Integration

In this lesson, you will connect to Gmail using OAuth 2.0, build a client that fetches emails with pagination, parse message headers and body content, handle threads, and store everything in the SQLite database. By the end, you will have a working email pipeline that can pull your latest messages on demand.

Gmail OAuth 2.0 Authentication

The Gmail API requires OAuth 2.0 for user authentication. Our auth module handles the entire flow: loading credentials, opening the browser for consent, and saving the refresh token for future use.

# app/gmail/auth.py
"""Gmail OAuth 2.0 authentication handler."""
import os
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

from app.config import config


def get_gmail_credentials() -> Credentials:
    """
    Get or refresh Gmail OAuth credentials.

    On first run, opens a browser for user consent.
    Subsequent runs use the saved refresh token.

    Returns:
        Valid Google OAuth2 Credentials object.
    """
    creds = None

    # Load existing token if available
    if os.path.exists(config.gmail_token_path):
        creds = Credentials.from_authorized_user_file(
            config.gmail_token_path, config.gmail_scopes
        )

    # If no valid credentials, authenticate
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            print("Refreshing expired Gmail token...")
            creds.refresh(Request())
        else:
            print("Starting Gmail OAuth flow...")
            print("A browser window will open for authorization.")
            flow = InstalledAppFlow.from_client_secrets_file(
                config.gmail_credentials_path, config.gmail_scopes
            )
            creds = flow.run_local_server(port=0)

        # Save the token for future runs
        os.makedirs(os.path.dirname(config.gmail_token_path), exist_ok=True)
        with open(config.gmail_token_path, "w") as token_file:
            token_file.write(creds.to_json())
        print(f"Token saved to {config.gmail_token_path}")

    return creds


def get_gmail_service():
    """
    Build and return an authenticated Gmail API service.

    Returns:
        googleapiclient.discovery.Resource for Gmail API v1.
    """
    creds = get_gmail_credentials()
    service = build("gmail", "v1", credentials=creds)
    return service
📝
First Run: The first time you run this, a browser window will open asking you to authorize the app. After granting permission, a token.json file is saved so you will not be prompted again until the token expires.

Gmail API Client

Now build the client that fetches emails, handles pagination, and retrieves full message details:

# app/gmail/client.py
"""Gmail API client for fetching and sending emails."""
import base64
from email.mime.text import MIMEText
from typing import Optional

from app.gmail.auth import get_gmail_service
from app.config import config


class GmailClient:
    """Client for interacting with the Gmail API."""

    def __init__(self):
        self.service = get_gmail_service()
        self.user_id = "me"

    def fetch_messages(
        self,
        query: str = "is:inbox",
        max_results: int = None,
        page_token: str = None
    ) -> dict:
        """
        Fetch message IDs matching a query.

        Args:
            query: Gmail search query (e.g., "is:inbox", "is:unread").
            max_results: Maximum number of messages to return.
            page_token: Token for pagination.

        Returns:
            Dict with 'messages' list and optional 'nextPageToken'.
        """
        if max_results is None:
            max_results = config.max_emails_per_fetch

        params = {
            "userId": self.user_id,
            "q": query,
            "maxResults": max_results,
        }
        if page_token:
            params["pageToken"] = page_token

        result = self.service.users().messages().list(**params).execute()
        return result

    def get_message(self, message_id: str, format: str = "full") -> dict:
        """
        Get full message details by ID.

        Args:
            message_id: Gmail message ID.
            format: Response format ('full', 'metadata', 'raw').

        Returns:
            Full message resource dict.
        """
        message = (
            self.service.users()
            .messages()
            .get(userId=self.user_id, id=message_id, format=format)
            .execute()
        )
        return message

    def get_thread(self, thread_id: str) -> dict:
        """
        Get all messages in a thread.

        Args:
            thread_id: Gmail thread ID.

        Returns:
            Thread resource dict with all messages.
        """
        thread = (
            self.service.users()
            .threads()
            .get(userId=self.user_id, id=thread_id)
            .execute()
        )
        return thread

    def fetch_all_messages(
        self, query: str = "is:inbox", max_total: int = None
    ) -> list:
        """
        Fetch all messages matching a query with automatic pagination.

        Args:
            query: Gmail search query.
            max_total: Maximum total messages to fetch.

        Returns:
            List of message resource dicts (IDs only).
        """
        if max_total is None:
            max_total = config.max_emails_per_fetch

        all_messages = []
        page_token = None

        while len(all_messages) < max_total:
            remaining = max_total - len(all_messages)
            batch_size = min(remaining, 100)  # Gmail API max per page

            result = self.fetch_messages(
                query=query,
                max_results=batch_size,
                page_token=page_token
            )

            messages = result.get("messages", [])
            all_messages.extend(messages)

            page_token = result.get("nextPageToken")
            if not page_token:
                break

        print(f"Fetched {len(all_messages)} message IDs")
        return all_messages[:max_total]

    def send_message(
        self,
        to: str,
        subject: str,
        body: str,
        thread_id: Optional[str] = None,
        in_reply_to: Optional[str] = None
    ) -> dict:
        """
        Send an email message.

        Args:
            to: Recipient email address.
            subject: Email subject line.
            body: Plain text email body.
            thread_id: Gmail thread ID (for replies).
            in_reply_to: Message-ID header (for threading).

        Returns:
            Sent message resource dict.
        """
        message = MIMEText(body)
        message["to"] = to
        message["subject"] = subject

        if in_reply_to:
            message["In-Reply-To"] = in_reply_to
            message["References"] = in_reply_to

        raw = base64.urlsafe_b64encode(
            message.as_bytes()
        ).decode("utf-8")

        body_payload = {"raw": raw}
        if thread_id:
            body_payload["threadId"] = thread_id

        sent = (
            self.service.users()
            .messages()
            .send(userId=self.user_id, body=body_payload)
            .execute()
        )
        print(f"Message sent. ID: {sent['id']}")
        return sent

    def modify_labels(
        self,
        message_id: str,
        add_labels: list = None,
        remove_labels: list = None
    ) -> dict:
        """
        Add or remove labels from a message.

        Args:
            message_id: Gmail message ID.
            add_labels: List of label IDs to add.
            remove_labels: List of label IDs to remove.

        Returns:
            Modified message resource dict.
        """
        body = {
            "addLabelIds": add_labels or [],
            "removeLabelIds": remove_labels or [],
        }
        result = (
            self.service.users()
            .messages()
            .modify(userId=self.user_id, id=message_id, body=body)
            .execute()
        )
        return result

Email Parser

Raw Gmail API responses are nested and base64-encoded. The parser extracts clean, usable data:

# app/gmail/parser.py
"""Parse Gmail API message responses into clean data."""
import base64
from datetime import datetime
from typing import Optional
from bs4 import BeautifulSoup
import html2text


def parse_message(message: dict) -> dict:
    """
    Parse a full Gmail message into a clean dictionary.

    Args:
        message: Gmail API message resource (format='full').

    Returns:
        Dict with subject, sender, date, body_text, body_html, etc.
    """
    headers = message.get("payload", {}).get("headers", [])
    header_map = {h["name"].lower(): h["value"] for h in headers}

    # Parse the date
    date_str = header_map.get("date", "")
    date = _parse_date(date_str)

    # Extract sender name and email
    sender_raw = header_map.get("from", "")
    sender_name, sender_email = _parse_sender(sender_raw)

    # Extract body content
    body_text, body_html = _extract_body(message.get("payload", {}))

    # Check for attachments
    has_attachments = _has_attachments(message.get("payload", {}))

    # Get labels
    labels = message.get("labelIds", [])

    return {
        "gmail_id": message["id"],
        "thread_id": message.get("threadId", ""),
        "subject": header_map.get("subject", "(no subject)"),
        "sender": sender_email,
        "sender_name": sender_name,
        "recipient": header_map.get("to", ""),
        "date": date,
        "snippet": message.get("snippet", ""),
        "body_text": body_text,
        "body_html": body_html,
        "labels": ",".join(labels),
        "is_read": "UNREAD" not in labels,
        "has_attachments": has_attachments,
        "message_id": header_map.get("message-id", ""),
    }


def _parse_date(date_str: str) -> Optional[datetime]:
    """Parse an email date string into a datetime object."""
    from email.utils import parsedate_to_datetime
    try:
        return parsedate_to_datetime(date_str)
    except (ValueError, TypeError):
        return None


def _parse_sender(sender_raw: str) -> tuple:
    """
    Parse 'John Doe ' into (name, email).

    Returns:
        Tuple of (sender_name, sender_email).
    """
    if "<" in sender_raw and ">" in sender_raw:
        name = sender_raw.split("<")[0].strip().strip('"')
        email = sender_raw.split("<")[1].split(">")[0].strip()
        return name, email
    return "", sender_raw.strip()


def _extract_body(payload: dict) -> tuple:
    """
    Recursively extract text and HTML body from message payload.

    Gmail messages can have nested MIME parts. This function
    walks the tree to find text/plain and text/html parts.

    Returns:
        Tuple of (body_text, body_html).
    """
    body_text = ""
    body_html = ""

    mime_type = payload.get("mimeType", "")

    # Single-part message
    if mime_type == "text/plain":
        data = payload.get("body", {}).get("data", "")
        if data:
            body_text = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")
    elif mime_type == "text/html":
        data = payload.get("body", {}).get("data", "")
        if data:
            body_html = base64.urlsafe_b64decode(data).decode("utf-8", errors="replace")

    # Multi-part message - recurse into parts
    parts = payload.get("parts", [])
    for part in parts:
        part_text, part_html = _extract_body(part)
        if part_text and not body_text:
            body_text = part_text
        if part_html and not body_html:
            body_html = part_html

    # If we only have HTML, convert to text
    if body_html and not body_text:
        converter = html2text.HTML2Text()
        converter.ignore_links = False
        converter.ignore_images = True
        body_text = converter.handle(body_html)

    return body_text, body_html


def _has_attachments(payload: dict) -> bool:
    """Check if the message has file attachments."""
    parts = payload.get("parts", [])
    for part in parts:
        if part.get("filename"):
            return True
        # Check nested parts
        if _has_attachments(part):
            return True
    return False


def get_thread_context(messages: list, current_id: str) -> str:
    """
    Build conversation context from a thread for LLM input.

    Args:
        messages: List of parsed message dicts in the thread.
        current_id: Gmail ID of the current message.

    Returns:
        Formatted thread context string.
    """
    context_parts = []
    for msg in messages:
        if msg["gmail_id"] == current_id:
            continue  # Skip the current message
        sender = msg.get("sender_name") or msg.get("sender", "Unknown")
        date = msg.get("date", "")
        if isinstance(date, datetime):
            date = date.strftime("%Y-%m-%d %H:%M")
        body = msg.get("body_text", "")[:500]  # Truncate long bodies
        context_parts.append(
            f"From: {sender} ({date})\n{body}"
        )
    return "\n---\n".join(context_parts)

Storing Emails in the Database

Now tie everything together with a function that fetches emails and stores them:

# app/gmail/__init__.py
"""Gmail integration - fetch, parse, and store emails."""
from app.gmail.client import GmailClient
from app.gmail.parser import parse_message
from app.database import SessionLocal, Email


def fetch_and_store_emails(
    query: str = "is:inbox newer_than:1d",
    max_results: int = 50
) -> list:
    """
    Fetch emails from Gmail and store them in the database.

    Args:
        query: Gmail search query.
        max_results: Maximum emails to fetch.

    Returns:
        List of newly stored Email objects.
    """
    client = GmailClient()
    session = SessionLocal()
    new_emails = []

    try:
        # Fetch message IDs
        message_ids = client.fetch_all_messages(
            query=query, max_total=max_results
        )

        for msg_ref in message_ids:
            msg_id = msg_ref["id"]

            # Skip if already in database
            existing = (
                session.query(Email)
                .filter(Email.gmail_id == msg_id)
                .first()
            )
            if existing:
                continue

            # Fetch full message and parse
            raw_message = client.get_message(msg_id)
            parsed = parse_message(raw_message)

            # Create database record
            email = Email(
                gmail_id=parsed["gmail_id"],
                thread_id=parsed["thread_id"],
                subject=parsed["subject"],
                sender=parsed["sender"],
                sender_name=parsed["sender_name"],
                recipient=parsed["recipient"],
                date=parsed["date"],
                snippet=parsed["snippet"],
                body_text=parsed["body_text"],
                body_html=parsed["body_html"],
                labels=parsed["labels"],
                is_read=parsed["is_read"],
                has_attachments=parsed["has_attachments"],
            )
            session.add(email)
            new_emails.append(email)

        session.commit()
        print(f"Stored {len(new_emails)} new emails")

    except Exception as e:
        session.rollback()
        print(f"Error fetching emails: {e}")
        raise
    finally:
        session.close()

    return new_emails

Test the Email Integration

# Test fetching emails (run after OAuth setup)
python -c "
from app.gmail import fetch_and_store_emails

emails = fetch_and_store_emails(
    query='is:inbox newer_than:1d',
    max_results=10
)
for e in emails:
    print(f'  [{e.sender}] {e.subject}')
print(f'Total: {len(emails)} new emails stored')
"
📝
Checkpoint: You should now be able to authenticate with Gmail, fetch your recent emails, parse them into clean structured data, and store them in SQLite. Run the test above and verify you see your inbox emails printed. If you get an authentication error, make sure your test user is added to the OAuth consent screen.

Key Takeaways

  • OAuth 2.0 handles authentication securely. The refresh token is saved so you only authorize once.
  • The Gmail API returns nested, base64-encoded payloads. Our parser recursively extracts text and HTML bodies from multipart messages.
  • Pagination with nextPageToken ensures we can fetch large batches of emails without hitting API limits.
  • Deduplication by gmail_id prevents storing the same email twice on repeated fetches.
  • The send_message method supports threading by passing thread_id and In-Reply-To headers.

What Is Next

In the next lesson, you will build the email classification engine — using an LLM to detect priority levels, assign category tags, and analyze sentiment for every incoming email.