Building a Google Drive Watcher for Automated Workflows
The Use Case
I wanted a simple trigger for my AI pipeline: drop a file into a Google Drive folder, and the pipeline starts automatically. This is useful for workflows where non-technical collaborators upload content (images, audio files, documents) and the system processes them without any manual intervention.
Google Drive's API supports push notifications via webhooks, but for my use case a simpler polling approach works perfectly. I built a lightweight Python watcher that checks a Drive folder every 60 seconds and triggers actions when new files appear.
Setting Up Google Drive API Access
Service Account Approach
For server-side automation, a service account is the cleanest option. No OAuth consent screens, no token refresh headaches:
- Go to the Google Cloud Console
- Create a new project or select an existing one
- Enable the Google Drive API
- Create a service account and download the JSON key file
- Share your target Drive folder with the service account email
from google.oauth2 import service_account
from googleapiclient.discovery import build
SCOPES = ["https://www.googleapis.com/auth/drive.readonly"]
creds = service_account.Credentials.from_service_account_file(
"service-account.json", scopes=SCOPES
)
service = build("drive", "v3", credentials=creds)The readonly scope is sufficient for watching. If your workflow needs to move or rename files after processing, use the full drive scope instead.
Listing Files in a Folder
To watch a folder, you need its ID (the long string in the folder URL). Then query for files:
def list_files(service, folder_id: str) -> list[dict]:
results = service.files().list(
q=f"'{folder_id}' in parents and trashed = false",
fields="files(id, name, createdTime, mimeType)",
orderBy="createdTime desc",
pageSize=50
).execute()
return results.get("files", [])Building the Watcher
The watcher maintains a set of known file IDs. When a new ID appears, it triggers the processing callback:
import time
import logging
class DriveWatcher:
def __init__(self, service, folder_id: str, callback):
self.service = service
self.folder_id = folder_id
self.callback = callback
self.known_files = set()
self._initialize()
def _initialize(self):
files = list_files(self.service, self.folder_id)
self.known_files = {f["id"] for f in files}
logging.info(f"Initialized with {len(self.known_files)} existing files")
def check(self):
files = list_files(self.service, self.folder_id)
current_ids = {f["id"] for f in files}
new_ids = current_ids - self.known_files
for file_info in files:
if file_info["id"] in new_ids:
logging.info(f"New file detected: {file_info['name']}")
self.callback(file_info)
self.known_files = current_ids
def run(self, interval: int = 60):
logging.info(f"Watching folder {self.folder_id} every {interval}s")
while True:
try:
self.check()
except Exception as e:
logging.error(f"Watch error: {e}")
time.sleep(interval)The _initialize method snapshots current files on startup so it does not reprocess existing content. Only files that appear after the watcher starts will trigger the callback.
Downloading Files
When a new file is detected, the callback typically downloads it:
from googleapiclient.http import MediaIoBaseDownload
import io
def download_file(service, file_id: str, output_path: str):
request = service.files().get_media(fileId=file_id)
with open(output_path, "wb") as f:
downloader = MediaIoBaseDownload(f, request)
done = False
while not done:
status, done = downloader.next_chunk()
logging.info(f"Download progress: {int(status.progress() * 100)}%")Triggering AI Workflows
Here is how I connect the watcher to my pipeline:
def on_new_file(file_info: dict):
name = file_info["name"]
file_id = file_info["id"]
mime = file_info["mimeType"]
local_path = f"/tmp/drive_downloads/{name}"
download_file(service, file_id, local_path)
if mime.startswith("audio/"):
process_audio(local_path)
elif mime.startswith("image/"):
process_image(local_path)
elif mime == "application/pdf":
process_document(local_path)
else:
logging.warning(f"Unsupported file type: {mime}")
watcher = DriveWatcher(service, FOLDER_ID, on_new_file)
watcher.run(interval=60)The callback routes files to different processing functions based on MIME type. Audio files go to the transcription pipeline, images to the thumbnail generator, and PDFs to the document analysis workflow.
Production Hardening
For reliable operation, I added several improvements:
- Persistence: Save known file IDs to a JSON file so the watcher survives restarts without reprocessing
- Error recovery: Wrap the callback in try/except and send Telegram alerts on failure
- Deduplication: Check if a file has already been processed before downloading
- PM2 management: Run the watcher as a PM2 process for automatic restarts
Alternatives Considered
Google Drive push notifications via webhooks would be more efficient than polling, but they require a publicly accessible HTTPS endpoint and webhook verification. For a server behind a firewall or without a domain, polling is simpler and perfectly adequate at 60-second intervals. The API quota cost is negligible.
For more complex workflows, consider using Google Cloud Functions triggered by Drive events. But for a single-folder watcher, the polling approach is the right level of complexity.