Making a WhatsApp AI Agent with GPT-4o | by Lukasz Kowejsza | Dec, 2024
Since our server is operating domestically, the WhatsApp Webhook can’t name the endpoint for verification. What we’d like is a public URL that can be utilized by the webhook. There are two choices: deploy the appliance to a cloud server or create a proxy server tunnel. Since we’re nonetheless within the growth course of, we are going to use the second choice.
- Go to ngrok Signup and create a free account.
- Set up ngrok domestically. Relying in your system, you should use Brew, Chocolatey, or just obtain and set up it. See: Setup & Installation.
- After set up, add your authentication code utilizing the next command in your terminal. Exchange
$YOUR-AUTHENTICATION_TOKEN
together with your ngrok authentication token, which might be discovered underneath “Your Authtoken” within the ngrok dashboard. - Start forwarding visitors out of your localhost on port 8000 by operating the next command in your terminal:
> ngrok config add-authtoken $YOUR-AUTHENTICATION_TOKEN
> ngrok http http://localhost:8000Forwarding https://<random-string>.ngrok.io -> http://localhost:8000
Your native server is now accessible by way of public URLs offered by ngrok. You need to see one thing like this:
Forwarding https://<random-string>.ngrok.io -> http://localhost:8000
Use the HTTPS URL offered by ngrok for the webhook configuration.
Now allow us to return to Meta’s Cloud API to implement the specified webhook.
- Navigate to Meta for Developers and choose the app created earlier than.
- Within the left-hand menu go to WhatsApp > Configuration.
- Within the Webhook part paste your ngrok HTTPS forwarding URL into the Callback URL subject and enter the
VERIFICATION_TOKEN
outlined inmost important.py
into the Verification Token subject. - Click on the verify and save button and look forward to the webhook to confirm your backend.
- Within the part Webhook Fields allow the
messages
toggle underneath Subscribed Fields.
That’s it! You need to now have the ability to obtain WhatsApp messages in your Python backend server.
Webhooks are HTTP callbacks that allow applications to obtain real-time updates when sure occasions happen corresponding to a brand new message or a standing change. Webhooks make system integrations and automation attainable by delivering an HTTP request containing occasion knowledge to a pre-configured URL (in our case the ngrok proxy server url).
To grasp the logic and pricing behind webhooks within the Meta cosmos it’s useful to grasp some fundamental rules about conversations.
A ‘dialog’ on WhatsApp API begins when:
1. The Person sends a message: This opens a 24-hour window, throughout which you’ll reply with messages together with textual content, photos, or different media with out further prices.
2. The Enterprise Initiates Contact: If no consumer message has been obtained lately (no open 24-hour window), your AI assistant should use a pre-approved template message to begin the dialog. You possibly can add customized templates however they have to be accredited by Meta.
So long as the consumer retains replying, the 24-hour window resets with every new message. This makes it attainable to have steady interplay with out further prices. A Dialog prices about 0.00–0.08 USD. The concrete pricing is predicated on you dialog sort Advertising, Utility, Service and your location. FYI: Service Conversations appear to be these days totally free. You will discover the concrete pricing right here: Whatsapp Pricing
Now we’re in a position to obtain messages in our backend. Since we now have subscribed to message objects, every time a message is shipped to your take a look at quantity, the webhook will create a POST request to the callback URL that you simply outlined within the earlier step. What we have to do subsequent is to construct an endpoint for POST requests in our FastAPI software.
Allow us to first outline the necessities:
- Return a 200 HTTP Standing Code: That is important to tell CloudAPI that the message has been obtained efficiently. Failing to take action will trigger CloudAPI to retry sending the message for as much as 7 days.
- Extract Telephone Quantity and Message: The payload of the incoming request incorporates knowledge that features the cellphone quantity and the message. Which we have to course of within the backend.
- Filter Incoming Objects: Since CloudAPI would possibly ship a number of occasions for a similar message (corresponding to despatched, obtained, and browse), the backend must ensures that just one occasion of the message is processed.
- Deal with A number of Message Varieties: The backend can deal with various kinds of messages, corresponding to textual content, voice messages, and pictures. With a view to not unfold the scope of the artice we are going to solely lay the inspiration for photos however not implement it to the tip.
- Course of with LLM-Agent Workflow: The extracted data is processed utilizing the LLM-Agent workflow, which we now have developed with earlier elements of this collection. You may also use one other agentic implementation, e.g. Langchain or Langgraph
We’ll obtain a payload from a webhook. You will discover instance payloads in Meta’s documentation: Example Payload
I favor to jot down my code with Pydantic so as to add sort security to my Python code. Furthermore, sort annotations and Pydantic are an optimum match for FastAPI purposes. So, let’s first outline the fashions utilized in our endpoint:
# app/schema.py
from typing import Checklist, Non-compulsory
from pydantic import BaseModel, Subject class Profile(BaseModel):
identify: str
class Contact(BaseModel):
profile: Profile
wa_id: str
class Textual content(BaseModel):
physique: str
class Picture(BaseModel):
mime_type: str
sha256: str
id: str
class Audio(BaseModel):
mime_type: str
sha256: str
id: str
voice: bool
class Message(BaseModel):
from_: str = Subject(..., alias="from")
id: str
timestamp: str
textual content: Textual content | None = None
picture: Picture | None = None
audio: Audio | None = None
sort: str
class Metadata(BaseModel):
display_phone_number: str
phone_number_id: str
class Worth(BaseModel):
messaging_product: str
metadata: Metadata
contacts: Checklist[Contact] | None = None
messages: Checklist[Message] | None = None
class Change(BaseModel):
worth: Worth
subject: str
statuses: Checklist[dict] | None = None
class Entry(BaseModel):
id: str
modifications: Checklist[Change]
class Payload(BaseModel):
object: str
entry: Checklist[Entry]
class Person(BaseModel):
id: int
first_name: str
last_name: str
cellphone: str
function: str
class UserMessage(BaseModel):
consumer: Person
message: str | None = None
picture: Picture | None = None
audio: Audio | None = None
Subsequent, we’re going to create some helper capabilities for utilizing dependency injection in FastAPI:
# app/most important.pyfrom app.area import message_service
def parse_message(payload: Payload) -> Message | None:
if not payload.entry[0].modifications[0].worth.messages:
return None
return payload.entry[0].modifications[0].worth.messages[0]
def get_current_user(message: Annotated[Message, Depends(parse_message)]) -> Person | None:
if not message:
return None
return message_service.authenticate_user_by_phone_number(message.from_)
def parse_audio_file(message: Annotated[Message, Depends(parse_message)]) -> Audio | None:
if message and message.sort == "audio":
return message.audio
return None
def parse_image_file(message: Annotated[Message, Depends(parse_message)]) -> Picture | None:
if message and message.sort == "picture":
return message.picture
return None
def message_extractor(
message: Annotated[Message, Depends(parse_message)],
audio: Annotated[Audio, Depends(parse_audio_file)],
):
if audio:
return message_service.transcribe_audio(audio)
if message and message.textual content:
return message.textual content.physique
return None
- Parsing the Payload: The
parse_message
perform extracts the primary message from the incoming payload if it exists. This perform returnsNone
if no messages are discovered, in order that solely legitimate messages are processed. - Person Authentication: The
get_current_user
perform makes use of theparse_message
dependency injection to extract the message after which authenticates the consumer primarily based on the cellphone quantity related to the message. Right here we be sure that solely authenticated customers are allowed to ship messages. - Audio and Picture Parsing: These capabilities extract audio or picture information from the message if the message sort is “audio” or “picture,” respectively. This enables the appliance to deal with various kinds of media.
- Message Extraction: The
message_extractor
perform makes an attempt to extract textual content from the message or transcribe audio into textual content. This ensures that whatever the message sort, the content material might be processed.
Right here we now have one import from our area layer. The entire script message_service
is the place we place all domain-specific code for this implementation, corresponding to authenticate_user_by_phone_number
and transcribe_audio
.
# app/most important.py
import threading
from typing_extensions import Annotated
from fastapi import APIRouter, Question, HTTPException, Relies upon
from app.area import message_service
from app.schema import Payload, Message, Audio, Picture, Person # ... remainder of the code ...
@app.submit("/", status_code=200)
def receive_whatsapp(
consumer: Annotated[User, Depends(get_current_user)],
user_message: Annotated[str, Depends(message_extractor)],
picture: Annotated[Image, Depends(parse_image_file)],
):
if not consumer and never user_message and never picture:
return {"standing": "okay"}
if not consumer:
increase HTTPException(status_code=401, element="Unauthorized")
if picture:
return print("Picture obtained")
if user_message:
thread = threading.Thread(
goal=message_service.respond_and_send_message,
args=(user_message, consumer)
)
thread.daemon = True
thread.begin()
return {"standing": "okay"}
- POST Endpoint Implementation: This endpoint handles the incoming POST request. It checks if the consumer, message, or picture is legitimate. If none are legitimate, it merely returns a standing message to CloudAPI. If the consumer will not be authenticated, it raises an
HTTPException
with a 401 standing code. - Processing Photos and Messages: If a picture is obtained, we make a easy stdout print as a placeholder for future picture dealing with. If a textual content message is obtained, it’s processed asynchronously utilizing a separate thread to keep away from blocking the primary software thread. The
message_service.respond_and_send_message
perform is invoked to deal with the message in line with the LLM-Agent workflow.
Clarification for Utilizing Thread Pooling for the Webhook: WhatsApp will resend the webhook till it will get a 200 response, so thread pooling is used to make sure that message dealing with doesn’t block the webhook response.
In our presentation layer the place we beforehand outlined our endpoint, we use some message_service
capabilities that have to be outlined subsequent. Particularly, we’d like an implementation for processing and transcribing audio payloads, authenticating customers, and at last invoking our agent and sending a response again. We’ll place all this performance inside area/message_service.py
. In manufacturing settings, as your software grows, I’d advocate splitting them additional down into, e.g., transcription_service.py
, message_service.py
, and authentication_service.py
.
In a number of capabilities on this part, we are going to make requests to the Meta API "https://graph.fb.com/..."
. In all of those requests, we have to embrace authorization headers with WHATSAPP_API_KEY
, which we created in step 1.3, because the bearer token. I normally retailer API keys and tokens in an .env
file and entry them with the Python dotenv
library. We additionally use the OpenAI consumer together with your OPENAI_API_KEY
, which is also saved within the .env
file.
However for simplicity, let’s simply place and initialize them on the high of message_service.py
scripts as follows:
import os
import json
import requests
from typing import BinaryIOWHATSAPP_API_KEY = "YOUR_ACCESS_TOKEN"
llm = OpenAI(api_key="YOUR_OPENAI_API_KEY")
Exchange “YOUR_ACCESS_TOKEN” together with your precise entry token that you simply created in step 1.3.
Dealing with voice data from a WhatsApp webhook will not be as simple as it might appear. To start with, you will need to know that the incoming webhook solely tells us the info sort and an object ID. So it doesn’t comprise the binary audio file. We first should obtain the audio file utilizing Meta’s Graph API. To obtain our obtained audio, we have to make two sequential requests. The primary one is a GET request with the object_id
to acquire the obtain URL. This obtain URL is the goal of our second GET request.
def download_file_from_facebook(file_id: str, file_type: str, mime_type: str) -> str | None:
# First GET request to retrieve the obtain URL
url = f"https://graph.fb.com/v19.0/{file_id}"
headers = {"Authorization": f"Bearer {WHATSAPP_API_KEY}"}
response = requests.get(url, headers=headers)
if response.status_code == 200:
download_url = response.json().get('url')
# Second GET request to obtain the file
response = requests.get(download_url, headers=headers)
if response.status_code == 200:
# Extract file extension from mime_type
file_extension = mime_type.break up('/')[-1].break up(';')[0]
# Create file_path with extension
file_path = f"{file_id}.{file_extension}"
with open(file_path, 'wb') as file:
file.write(response.content material)
if file_type == "picture" or file_type == "audio":
return file_path
increase ValueError(f"Did not obtain file. Standing code: {response.status_code}")
increase ValueError(f"Did not retrieve obtain URL. Standing code: {response.status_code}")
Right here, we mainly get the obtain URL and obtain the file to the file system utilizing the thing ID and the file extension as its file_path
. If one thing fails, we increase a ValueError
that signifies the place the error occurred.
Subsequent, we merely outline a perform that takes the audio binary and transcribes it utilizing Whisper:
def transcribe_audio_file(audio_file: BinaryIO) -> str:
if not audio_file:
return "No audio file offered"
strive:
transcription = llm.audio.transcriptions.create(
file=audio_file,
mannequin="whisper-1",
response_format="textual content"
)
return transcription
besides Exception as e:
increase ValueError("Error transcribing audio") from e
And eventually, let’s convey the obtain and transcription capabilities collectively:
def transcribe_audio(audio: Audio) -> str:
file_path = download_file_from_facebook(audio.id, "audio", audio.mime_type)
with open(file_path, 'rb') as audio_binary:
transcription = transcribe_audio_file(audio_binary)
strive:
os.take away(file_path)
besides Exception as e:
print(f"Did not delete file: {e}")
return transcription
Whereas utilizing the take a look at quantity offered by Meta, we now have to predefine which numbers our chatbot can ship messages to. I’m not fairly certain and haven’t examined if any quantity can ship a message to our chatbot. However anyway, as quickly as we change to a customized quantity, we don’t need anybody to have the ability to execute our agent chatbot. So we’d like a technique to authenticate the consumer. Now we have a number of choices to do that. To start with, we now have to consider the place to retailer consumer data. We may use, for instance, a database like PostgreSQL or a non-relational database like Firestore. We are able to predefine our customers within the file system in a JSON file or in an .env
file. For this tutorial, I’ll go together with the only method and hardcode the consumer inside an inventory in our authentication perform.
An inventory entry has the construction of the Person
mannequin as outlined in step 5.1. So a consumer consists of an ID, first identify, final identify, and cellphone quantity. Now we have not carried out a job system in our agent workflow but. However in most use instances with totally different customers, corresponding to within the instance case of a small enterprise assistant, totally different customers may have totally different rights and entry scopes. For now, we simply move "default"
as a placeholder function.
def authenticate_user_by_phone_number(phone_number: str) -> Person | None:
allowed_users = [
{"id": 1, "phone": "+1234567890", "first_name": "John", "last_name": "Doe", "role": "default"},
{"id": 2, "phone": "+0987654321", "first_name": "Jane", "last_name": "Smith", "role": "default"}
]
for consumer in allowed_users:
if consumer["phone"] == phone_number:
return Person(**consumer)
return None
So simply confirm if the cellphone quantity is in our listing of allowed_users
and return the consumer whether it is. In any other case, we return None
. If you happen to take a look at our endpoint in step 5.3, you will notice we increase an error if the consumer is None
to stop additional processing of unauthorized consumer messages.
Now, our final helper perform earlier than we are able to really invoke our agent is send_whatsapp_message
. I’ve included two modes into this perform due to some Meta-specific WhatsApp API logic.
Principally, you aren’t allowed to ship a customized message to a consumer as a dialog starter. This implies you’ll be able to reply with a person textual content message if the consumer begins the dialog and writes a message to the chatbot first. In any other case, if you’d like the chatbot to provoke a dialog, you might be restricted to accredited templates, just like the “Good day World” template.
Additionally essential to say, once we discuss Meta logic, a dialog after being began opens a dialog window of 24 hours in which you’ll ship messages to that consumer. This dialog window can be what will get charged, not the person message. It will get a bit extra complicated primarily based on the kind of dialog, corresponding to advertising and marketing, assist, and so forth.
You may also outline a template by yourself and let it’s accredited by Meta. I’ve not executed that at this level, so to check if we are able to ship a message from our backend to a consumer, I exploit the “Good day World” template. If you happen to add some customized accredited templates, you may also use this perform to ship them to the consumer.
So again to the code. To ship a message, we make a POST request and outline a payload that both contains the textual content physique or the template:
def send_whatsapp_message(to, message, template=True):
url = f"https://graph.fb.com/v18.0/289534840903017/messages"
headers = {
"Authorization": f"Bearer " + WHATSAPP_API_KEY,
"Content material-Kind": "software/json"
}
if not template:
knowledge = {
"messaging_product": "whatsapp",
"preview_url": False,
"recipient_type": "particular person",
"to": to,
"sort": "textual content",
"textual content": {
"physique": message
}
}
else:
knowledge = {
"messaging_product": "whatsapp",
"to": to,
"sort": "template",
"template": {
"identify": "hello_world",
"language": {
"code": "en_US"
}
}
} response = requests.submit(url, headers=headers, knowledge=json.dumps(knowledge))
return response.json()
Lastly, we are able to combine our agent from our earlier examples. At this stage, you may also combine your customized agent, a Langchain AgentExecutor
, Langgraph AgentWorkflow
, and so forth.
So our most important perform that will probably be referred to as on every incoming message is respond_and_send_message
, which takes the user_message
string and passes it to our agent workflow because the enter object.
# app/area/message_service.py
import json
import requests
from app.area.brokers.routing_agent import RoutingAgent
from app.schema import Person def respond_and_send_message(user_message: str, consumer: Person):
agent = RoutingAgent()
response = agent.run(user_message, consumer.id)
send_whatsapp_message(consumer.cellphone, response, template=False)
After invoking our agent, we get a response message that we wish to ship again to the consumer utilizing the send_whatsapp_message perform.