Jose Ramirez
JoseTheEngineer's Blog

JoseTheEngineer's Blog

Getting your top Gmail senders with the Google Cloud API using Python

Getting your top Gmail senders with the Google Cloud API using Python

A fun intro to begin analyzing your G-Mail account emails

Jose Ramirez's photo
Jose Ramirez
·Nov 7, 2021·

8 min read

Hello everyone, welcome to my first blog post! Over the past month, I have been working on cleaning out my Gmail inbox because it currently has 28,000 unread emails. As a busy programmer, I instead switched over to using my Hotmail until I decided some of the cleanup in my Gmail can be automated in python!

As a first step towards figuring out what to delete from my email, I've created a way to look at which senders are in top n senders in my last m emails. The project requires you to create a Google Cloud Platform project and enable the Gmail API.

If at any point you want to check your code against mine you can find it on this gist.

1. Create a Google Cloud Platform project and enable the Gmail-API .

Create a project using this link by clicking Create Project and naming your project.

image.png

Once it is created, enable the Gmail API

image.png

image.png

Once the API is enabled, go into the API Settings and create OAuth credentials. You may have to fill out a consent form before you are allowed to create credentials.

image.png

Download the json file and save it as credentials.json.

image.png

2. Create Virtual Environment for the project

Create a virtual environment for the project. In my case, I used python 3.9.5.

python -m venv venv

Activate the venv

source venv/bin/activate

Install python requirements from my gist [requirements.txt] (gist.githubusercontent.com/jramirez857/9b81..) file.

pip install -r requirements.txt

3. Create Gmail Service class in gmail.py

"""
This module contains the GmailAuth class. It is used to authenticate with
Gmail.
"""
import argparse
import os.path as p
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials


# Sets the scope and application name for the project. This is required
# for the flow to create the necessary credentials.
SCOPES = ["https://mail.google.com/"]
APPLICATION_NAME = "Gmail API Python"




class GmailService:
    """
    The GmailService class is used to authenticate with the Gmail API.
    """

    def __init__(self, **kwargs):
        self.credentials_path = kwargs.get("credentials", "credentials.json")

    def _get_credentials(self, token_file="token.json"):
        """
        Checks if the token.json file exists and uses it if it does.
        If it doesnt, it checks for the credentials.json file, runs through the flow and
        saves the credentials for the next run.
        """
        user_credentials = None
        if p.exists(token_file):
            user_credentials = Credentials.from_authorized_user_file(token_file, SCOPES)
            if user_credentials.expired and user_credentials.refresh_token:
                user_credentials.refresh(Request())
        else:
            flow = InstalledAppFlow.from_client_secrets_file(
                self.credentials_path, SCOPES
            )
            user_credentials = flow.run_local_server(port=0)
        with open(token_file, "w") as token:
            token.write(user_credentials.to_json())
        return user_credentials

    def get(self):
        """
        Returns the service object for the email API.
        """
        return build("gmail", "v1", credentials=self._get_credentials())

What the above code does when you instantiate GmailService() and call the get() method:

  1. Gets the credentials by calling the self._get_credentials() method. This method checks if the authentication token exists already in token.json. If it does not exist it will launch a browser window for authentication and save the token.json file for next time we run the script.

  2. Once we have the credentials, the gmail service is built and returned.

4. Create the TopSenders class in top_senders.py

"""
This module implements the TopSenders class and GMail pydantic model. 
"""

from collections import defaultdict, OrderedDict
import pprint
import logging
import argparse
import progressbar
from gmail import GmailService
from pydantic import BaseModel



pp = pprint.PrettyPrinter(indent=4)

class GMail(BaseModel):
    id: str
    sender: str



class TopSenders:
    """
    This class is used to get the list of senders for a given number of emails.

    :param gmail: The Gmail API Service. Gets created by default if not provided.

    """

    def __init__(self, **kwargs):
        self.gmail = kwargs.get("gmail_service", GmailService().get())
        logging.basicConfig(
            level=kwargs.get("log_level", logging.DEBUG),
            format="%(funcName)s():%(lineno)i: %(levelname)s: %(message)s",
        )

    def _parse_email(self, message) -> GMail:
        """
        Parses the email and returns a Gmail email object.

        :param message: The message to parse.
        :return: An Email object.
        """
        _email = self.get_email_by_id(message["id"])
        _sender = self._get_sender(_email)
        return GMail(id=_email["id"], sender=_sender)

    def _extract_email_info(self, messages: list) -> list:
        """
        Extracts the needed information for emails in a response message.

        :param messages: The list of messages to extract email info from.
        :return emails: A list of Emails that were extracted from the messages.
        """
        emails = []
        for message in messages:
            _email = self._parse_email(message)
            emails.append(_email)
        return emails

    def _get_response(self, token=None) -> list:
        """
        Gets the messages from the Gmail API for the logged in user.
        """
        return (
            self.gmail.users().messages().list(userId="me", pageToken=token).execute()
            if token
            else self.gmail.users().messages().list(userId="me").execute()
        )

    def _get_num_emails(self, num: int) -> list:
        """
        Gets a batch of messages from the Gmail class for the logged in user.
        """
        response = self._get_response()
        emails = []
        logging.info("Getting %d emails", num)
        for _ in progressbar.progressbar(range(0, num, len(response["messages"]))):
            messages = response["messages"]
            emails.extend(self._extract_email_info(messages))
            if response.get("nextPageToken", None) is not None:
                response = self._get_response(response["nextPageToken"])
            else:
                logging.warning("No more messages")
                break
        logging.info("Successfully retrieved %d messages", len(emails))
        return emails

    def _get_sender(self, email) -> str:
        """
        Extracts the sender from the email and adds it to the senders dictionary.
        Increments the count for the sender in the dictionary.

        :param email: The email to extract the sender from.
        :return: The sender of the email.
        """
        sender = "Unknown Sender"
        for header in email["payload"]["headers"]:
            if header["name"] == "From":
                sender = header["value"]
                break
        return sender

    def get_email_by_id(self, message_id, user="me") -> dict:
        """
        Gets the email by the message id for specified user.

        :param message_id: The message to get the email for.
        :param user: The user to get the email for.
        :return: A dict with headers of the email with the passed in id
        """
        return self.gmail.users().messages().get(userId=user, id=message_id).execute()

    def get(self, num_emails: int, num_senders: int = 10) -> OrderedDict:
        """
        Gets a list of top senders from the Gmail API for the logged in user.

        :param num_emails: The number of emails to get the senders for.
        :param num_senders: The number of senders to return.
        :return: A list of the top senders of size num_senders.

        """
        senders = defaultdict(int)
        emails = self._get_num_emails(num=num_emails)
        logging.info(f"Getting top senders for { len(emails) } number of emails.")
        for email in progressbar.progressbar(emails):
            senders[email.sender] += 1
        top_senders = sorted(senders, key=senders.get, reverse=True)[:num_senders]
        logging.info("top senders: %s", top_senders)
        return top_senders


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Get the senders for a given number of emails."
    )
    parser.add_argument(
        "-n",
        "--num_emails",
        type=int,
        default=1000,
        help="The number of emails to get the senders for.",
    )
    parser.add_argument(
        "-s",
        "--num_senders",
        type=int,
        default=10,
        help="The number of top senders to return.",
    )
    args = parser.parse_args()
    top_senders = TopSenders(log_level=logging.INFO).get(num_emails=args.num_emails, num_senders=args.num_senders)
    pp.pprint(top_senders)

There is a lot going on in the class above but lets break it down. Here is what happens when we run

python top_senders.py
  1. We take in command line parameters using argparse. These parameters have defaults set to 1000 for --num_emails and 10 for --num_senders. We take in these arguments into args using

    parser = argparse.ArgumentParser(
         description="Get the senders for a given number of emails."
     )
     parser.add_argument(
         "-n",
         "--num_emails",
         type=int,
         default=1000,
         help="The number of emails to get the senders for.",
     )
     parser.add_argument(
         "-s",
         "--num_senders",
         type=int,
         default=10,
         help="The number of top senders to return.",
     )
     args = parser.parse_args()
    
  2. These parameters get passed into the TopSenders classes get() method as they are parameters we can configure for getting different numbers of emails and different numbers of senders.

    top_senders = TopSenders(log_level=logging.INFO).get(num_emails=args.num_emails, num_senders=args.num_senders)
    
  3. The TopSenders get() method gets called. The get() method first initiates an empty defaultdict senders. This defaultdict will have a default value of 0 for each sender we add to the dict that is not yet in the dict. Next, it retrieves the number of emails that was provided as user input. What the _get_num_emails() method does is get the number of emails we are asking for by requesting them from the Gmail service 100 at a time. It stops once there is no nextPageToken key in the API response or if it reaches the amount of emails we requested. The emails are returned as a list of our Pydantic GMail models we created at the top of the file.

    senders = defaultdict(int)
    emails = self._get_num_emails(num=num_emails)
    
  4. Once we have the list of emails, we can loop through them and add the sender to our defaultdict and increment it by 1 each time we see the same sender. The progress bar package provides us with a neat way to view progress while the script is running through the list of emails.
for email in progressbar.progressbar(emails):
            senders[email.sender] += 1
  1. Finally, we sort the senders defaultdict by its values in reverse order and return the top number of senders we provided to the script. top_senders gets logged with logging.info().
top_senders = sorted(senders, key=senders.get, reverse=True)[:num_senders]
        logging.info("top senders: %s", top_senders)
  1. top_senders gets returned by the get() method

6. Run the code!

Lets run our code and get the top 20 senders for our last 4000 emails. Be sure that the credentials.json file from step 1 is in the same folder as our top_senders.py script.

python top_senders.py --num_emails 4000 --num_senders 20

Obtaining 4000 emails from the GMail API takes a bit but eventually we get a list of emails in the output. You should get some output similar to below.

_get_num_emails():81: INFO: Getting 4000 emails
100% (40 of 40) |###################################################################################################################| Elapsed Time: 0:12:31 Time:  0:12:31
_get_num_emails():90: INFO: Successfully retrieved 4000 messages
get():128: INFO: Getting top senders for 4000 number of emails.
100% (4000 of 4000) |###############################################################################################################| Elapsed Time: 0:00:00 Time:  0:00:00

The output should also contain a list of your top 20 senders in your last 4000 emails! If it does not, feel free to check your code against mine in the gist and comment below if I may be of help!

Enjoyed this tutorial? Follow me for future tutorials as I add more features to build out my email cleaner. In the meantime, feel free to connect with me on Twitter @josetheengineer !

 
Share this