Integrate your API with a Temporal Application in Python
- Level: β Temporal beginner
- Time: β±οΈ ~25 minutes
- Goals: π
- Build and test a Temporal Application with Python SDK.
- Implement a client for Workflow Management.
- Integrate external Web API.
- Understand Durable Executions.
Introductionβ
When writing business logic, guaranteeing a consistent response involving an API can be challenging. APIs are often subject to issues like traffic-induced throttling, unexpected downtimes, or other external disruptions. These challenges require a robust approach to maintain reliable and performant applications.
Temporal lets you write durable executions with the programming languages youβre already familiar with, handling unpredictable API communications effectively. Mechanism like state, retries, and error handling integrate seamlessly into the Temporal Workflow, ensuring your API calls are made in a resilient manner. This approach enhances your application ability to deal with inherently unreliable external services.
The aim of this tutorial is to progressively integrate a weather forecasting API into your web application. This journey will demonstrate how durable functions can effectively mitigate the challenges posed by APIs, ensuring your applications remains robust and responsive.
Overviewβ
Following the foundational guide Build a Temporal Application from scratch in Python, you'll elevate your skills in developing Temporal Applications using the Python SDK. You'll write a Temporal Application with a web application for weather forecasting, connecting a frontend client to the backend process.
Here's what you'll explore and accomplish:
- Workflow (the orchestrator): You'll delve deep into the Workflow Class, the central orchestrator of your Temporal Application. This class meticulously outlines the sequence of operations, directing the call to the weather API.
- Activity (the executor): The Activity is where the action happens. This functional unit is key to performing tasks like fetching and processing data, bridging the gap between your Workflow's instructions and the external API's responses.
- Worker (the performer): Workers are the powerhouse of the Temporal framework, executing the code of your Workflows and Activities. This section will enlighten you on how Workers function, hosting and running various pieces of your applicationβs logic.
- Testing with Pytest (ensuring reliability): Testing is an integral part of software development, particularly for robust applications. This tutorial includes a segment on writing tests using
pytest
, enabling you to verify the successful execution of your Workflow. - Client Configuration (the bridge to action): A pivotal aspect of any Temporal Application is configuring the client. You'll go through the process of setting up and customizing the client, which acts as a conduit between your frontend (like a web application) and the Temporal backend.
Upon completing this tutorial, you'll Temporal application and gain comprehensive understanding of the interactions between its various components. This experience will prove invaluable as you continue to build sophisticated, scalable web applications using Temporal.
All necessary code and resources are available in the hello-weather-python-template
repository.
Prerequisitesβ
Before starting this tutorial:
- Build a Temporal Application from scratch in Python
This tutorial was written using:
- Temporal Python SDK 1.4
- Python 3.12
- Flask 2.2.3 with the
async
extrapip install "Flask[async]"
- National Weather Service (NWS) API accessed January 01, 2024: https://api.weather.gov
The National Weather Service API was chosen for this project as frictionless developer experience, in terms of getting access to an API without needing additional sign up. The API is not intended to be used for commercial purposes.
Modify the Workflowβ
In Temporal, a Workflow orchestrates Activities. The Workflow defines the logic and sequencing of tasks, while Activities perform the actual work. The Workflow executes Activities based on their defined order and dependencies.
To demonstrate a more complex use case, transition from the simple SayHello
Workflow to a more advanced WeatherWorkflow
. This Workflow will allow you to call a weather service API for forecasting.
Update Workflow importsβ
First, you'll need to prepare the essential tools. Open your workflows.py
file and add the following import statements:
# workflows.py
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import WeatherActivities, WeatherParams
By importing these classes, you're equipping the WeatherWorkflow
with capabilities to fetch and process weather data. The WeatherActivities
and WeatherParams
classes, imported from the activities.py
file, will play a key role in the subsequent steps.
Define Workflow classβ
Now, you define the WeatherWorkflow
class.
This class is responsible for orchestrating the execution of the get_weather
Activity from the WeatherActivities
class.
It takes in a WeatherParams
object containing location information.
The Workflow then passes this information to the Activity, which calls the weather API and returns the forecast data.
# workflows.py
@workflow.defn
class WeatherWorkflow:
@workflow.run
async def run(self, weather_params: WeatherParams) -> list[dict]:
forecast_periods = await workflow.execute_activity(
WeatherActivities.get_weather,
weather_params,
schedule_to_close_timeout=timedelta(seconds=10),
)
return forecast_periods
After executing the Activity, the Workflow processes the result and returns the final weather forecast periods list. WeatherWorkflow
not only defines the Workflow logic for retrieving weather data but also coordinates the sequencing of the Activity execution.
Moreover, it's important to consider error handling and retries. The Retry Policy is crucial in this context. The default policy is:
Initial Interval = 1 second
Backoff Coefficient = 2.0
Maximum Interval = 100 Γ Initial Interval
Maximum Attempts = β
Non-Retryable Errors = []
This policy, set on the Workflow options, is applied to all Activities.
It's particularly useful when dealing with rate limits or throttling issues from APIs. For instance, the NWS API documentation states:
The rate limit is not public information, but allows a generous amount for typical use. If the rate limit is exceeded a request will return with an error, and may be retried after the limit clears (typically within 5 seconds).
Therefore, in cases where the API's rate limit affects your Activity, the Workflow will automatically retry the Activity, in line with the defined Retry Policy.
If you wanted to modify the default Retry Policy, you'd set the RetryPolicy()
from within the Workflow Options:
forecast_periods = await workflow.execute_activity(
WeatherActivities.get_weather,
weather_params,
schedule_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(
backoff_coefficient=3.0,
maximum_attempts=5,
# initial_interval=timedelta(seconds=1),
# maximum_interval=timedelta(seconds=2),
# non_retryable_error_types=["ValueError"],
),
)
With these modifications in place, you're now ready to update and refine the Activity as well.
Modify the Activityβ
Activities in a Temporal application represent the actual work units.
They encapsulate business logic for performing specific tasks.
In the Build a Temporal Application: Create an Activity section, you created a simple SayHello
Activity.
Let's expand the scope of this Activity to include the functionality to fetch weather data from the National Weather Service API. In this section, you'll create a dataclass for handling parameters for the weather API and then define a class for managing the Activity itself, including the logic to call the weather API and process its response.
Update Activity importsβ
Before diving into the Activity's business logic, consider the role of the Activity file. It's where you execute operations that are prone to failure, such as network calls.
While this tutorial uses aiohttp
, you can use any HTTP client library you like; however, you should choose frameworks the supports your asynchronous or synchronous calls.
For more information, see Asynchronous vs. Synchronous Activity implementations.
Import the following modules into your file:
# activities.py
import socket
from dataclasses import dataclass
import aiohttp
from aiohttp import TCPConnector
from temporalio import activity
The aiohttp
modules are used to make asynchronous network calls to the weather API.
The dataclass
module is used to futureproof arguments sent to the Workflow.
The activity
module is used to define the Activity class.
Define the dataclassβ
Transitioning to data management, Temporal promotes the use of data classes in Python for several reasons, including compatibility, flexibility, and ease of serialization.
The WeatherParams
dataclass you'll define now represents the input parameters for the weather-related activities, such as office location and grid coordinates.
# activities.py
@dataclass
class WeatherParams:
office: str
gridX: int
gridY: int
Temporal encourages the use of data classes in Python for several reasons:
- Compatibility: Data classes allow you to add fields without breaking compatibility. This is particularly useful when defining parameters and return types for your Temporal workflows.
- Flexibility: Temporal Workflows can have any number of custom parameters. Using objects (like data classes) as parameters allows individual fields to be altered without breaking the signature of the workflow.
- Serialization: All Workflow definition parameters must be serializable. Dataclasses in Python, when properly type-annotated, can be easily serialized and deserialized, making them a good fit for this requirement.
With the dataclass in place, you can now focus on defining the Activity itself.
Define the Activity classβ
Shifting our attention to the core of the execution, Activities can be either functions or methods within a class. In this case, you'll encapsulate them within a class to efficiently manage connections to the weather API.
The WeatherActivities
class will contain the Activity get_weather
, which is responsible for making the API call and returning the forecast data.
# activities.py
class WeatherActivities:
def __init__(self):
# This will force the use of IPv4 and not IPv6 and bypass SSL certificate verification
connector = TCPConnector(family=socket.AF_INET, ssl=False)
self.session = aiohttp.ClientSession(connector=connector)
@activity.defn
async def get_weather(self, input: WeatherParams) -> list[dict]:
url = f"https://api.weather.gov/gridpoints/{input.office}/{input.gridX},{input.gridY}/forecast"
async with self.session.get(url) as response:
if response.status == 200:
forecast_data = await response.json()
periods = forecast_data["properties"]["periods"]
return periods
else:
response_text = await response.text()
raise Exception(
f"Could not retrieve weather data, status code {response.status}, response: {response_text}"
)
async def close(self):
await self.session.close()
Through these methods, the Activity class manages API interactions and data processing. Activities in Temporal can have multiple parameters, and all passed values are recorded in the Workflow's Event History. This structure ensures a clear separation between the execution of tasks (Activities) and their orchestration (Workflow), a fundamental principle in Temporal's architecture.
Using async
in Temporal's Activity code is crucial for non-blocking operations, particularly for network calls like API requests.
Asynchronous Activities allow for efficient and scalable handling of multiple tasks, improving overall performance by avoiding idle resource usage during waiting periods.
Implementing Activities within a class, provides organized and maintainable code by encapsulating related functionalities. This approach allows for efficient shared state management across Activity invocations and simplifies adding new functionalities, making the codebase more extensible and manageable.
Forecasts are created at each NWS Weather Forecast Office (WFO) on their own grid definition.
To obtain the grid forecast for a point location, use the /points
endpoint to retrieve the current grid forecast endpoint by coordinates:
https://api.weather.gov/points/{latitude},{longitude}
For example: https://api.weather.gov/points/39.7456,-97.0892
This will provide the grid forecast endpoints for three format options in these properties:
forecast
- forecast for 12h periods over the next seven daysforecastHourly
- forecast for hourly periods over the next seven daysforecastGridData
- raw forecast data over the next seven days
Now that the Activity class is defined, it's time to shift our focus to updating the Worker, where these Activities will be executed.
Modify the Workerβ
In Temporal applications, the Worker plays a crucial role as the execution engine for Workflows and Activities.
It's responsible not only for executing the defined tasks but also for maintaining continuous communication with the Temporal service.
This ensures that Workflows and Activities are handled efficiently and reliably.
In this section, you'll update the Worker to integrate the WeatherActivities
and WeatherWorkflow
.
This integration is vital for enabling the Worker to recognize and execute the specific tasks associated with our Weather Forecasting Service, thereby bridging the gap between the task definitions and their execution.
Update Worker importsβ
To start, you need to prepare the Worker.
Update the import statements to incorporate WeatherActivities
and WeatherWorkflow
into your Worker.
This step ensures that the Worker has access to the necessary components to execute the Weather Forecasting logic.
# run_worker.py
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
from activities import WeatherActivities
from workflows import WeatherWorkflow
Now that you have imported the necessary libraries, you can proceed to configure the Worker.
Update the Workerβ
With the imports in place, focus on configuring the Worker itself. While the core structure of the Worker remains the same, crucial updates are made to instantiate the Activity class and register the method on the Worker.
This allows the Worker to recognize and execute the specific WeatherActivities
and WeatherWorkflow
when called upon.
# run_worker.py
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[WeatherWorkflow],
activities=[WeatherActivities().get_weather],
)
try:
await worker.run()
finally:
await WeatherActivities().close()
if __name__ == "__main__":
asyncio.run(main())
Here's an overview of the Worker's updated functionality:
- The
WeatherWorkflow
is set up to be recognized and executed by the Worker. - The
get_weather
activity fromWeatherActivities
is similarly registered. - The Worker listens on the "my-task-queue" to receive and process new tasks.
- It executes the appropriate Workflow or Activity based on the task received from the Temporal service.
Essentially, the Worker functions as the heart of the application's runtime. It continuously listens for and executes tasks, bridging the gap between the Temporal service's scheduling and the actual task execution. This continuous operation persists until the Worker is shut down.
Upon shutdown, an important cleanup process occurs.
The Worker ensures that the WeatherActivities
session is properly closed.
This setup also allows for the distribution of work across multiple Workers, ensuring robust handling of tasks in diverse environments.
With the Worker now ready, the next step is to modify the Client, setting the stage for launching and interacting with your Weather Forecasting Service.
Modify the Clientβ
In a Temporal application, the Client plays a pivotal role in initiating and managing Workflow Executions.
It serves as the bridge between the user-facing interface and the backend Workflow logic.
In this section, you'll integrate the Client with a Flask application to start the Workflow and serve both the UI and API from a /weather
endpoint.
This setup allows for an interactive and responsive way to interact with the Temporal service, providing an accessible point of contact for users to request and receive weather forecasts.
Update Clients importsβ
The Weather Forecasting Service integrates Temporal's Client class to connect with the Temporal service and uses Flask to serve the UI and API. This configuration facilitates a smooth interaction between the user interface and the Workflow management system.
Change the name of the run_workflow.py
file to app.py
to help integrate the Flask application with the Client.
While this tutorial uses Flask as the web framework, the Client can be used with any web framework.
Add the following imports to your app.py
file.
# app.py
from flask import Flask, render_template
from temporalio.client import Client
from activities import WeatherParams
from workflows import WeatherWorkflow
Now that you have imported the necessary libraries, you can proceed to configure the Client.
Configure the Clientβ
The Client's primary role is to connect to the Temporal service and provide an API for Workflow management. It acts as the intermediary, enabling the Flask application to trigger and control Workflow Executions.
# app.py
app = Flask(__name__)
@app.route("/weather")
async def get_weather():
client = await Client.connect("localhost:7233")
weather_params = WeatherParams(office="SEW", gridX=123, gridY=61)
forecast_data = await client.execute_workflow(
WeatherWorkflow.run,
weather_params,
id="weather-workflow-id",
task_queue="my-task-queue",
)
simplified_forecast = []
for period in forecast_data:
period_data = {
"name": period.get("name"),
"startTime": period.get("startTime"),
"endTime": period.get("endTime"),
"temperature": period.get("temperature"),
"temperatureUnit": period.get("temperatureUnit"),
"windSpeed": period.get("windSpeed"),
"windDirection": period.get("windDirection"),
"shortForecast": period.get("shortForecast"),
"detailedForecast": period.get("detailedForecast"),
}
simplified_forecast.append(period_data)
return render_template("weather.html", forecast=simplified_forecast)
if __name__ == "__main__":
app.run(debug=True)
Here's what happens when a user accesses the /weather
endpoint:
- The Flask route
/weather
is dedicated to processing weather forecast requests. - It initializes the Temporal Client, preparing for interaction with the Temporal service.
- A
WeatherParams
object is created, carrying essential data for the weather query. - The
WeatherWorkflow.run
is executed via the Client, with the provided parameters. - Following execution, the Workflow returns weather data, which is then processed and organized.
- Flask's
render_template()
function then renders the forecast data in a user-friendly format.
Through this process, the Flask app, acting as the frontend client, interacts seamlessly with the Temporal backend. It delegates the execution of Workflows and Activities to the Worker, while it focuses on handling user requests and presenting data.
Following the successful integration of the Flask application with the Temporal backend, it's vital to shift our focus to the testing phase.
Test the Activityβ
Testing your Temporal application is an essential step to ensure the reliability and correctness of its Workflows and Activities.
This section guides you through writing tests using pytest
to confirm the functionality of your Weather Forecasting Service with Temporal's ActivityEnvironment package.
The Activity Environment is used for testing Activity code that can access the functions in the temporalio.activity
module.
Use run
to run an Activity function within an Activity context.
These tests are crucial for validating both the integration with a mock weather service and the behavior of your Temporal Activities.
Set up the testing environmentβ
The first step in testing is to establish a testing environment that mirrors your production setup as closely as possible. This step is critical for ensuring that your tests accurately reflect the real-world behavior of your application.
The setup involves two main components:
- creating a mock weather service
- configuring
pytest
fixtures
Create a folder called tests
, and inside that folder create a new file called test_activity.py
.
Add the following imports to the test_activity.py
file.
# test_activity.py
import pytest
from aiohttp import web
from temporalio.testing import ActivityEnvironment
from activities import WeatherActivities, WeatherParams
Now you're ready to begin mocking the Weather Service API.
Mock weather serviceβ
The mock weather service is a pivotal part of your testing setup. It replicates the behavior of the external weather API, offering controlled responses to your application. This controlled environment is invaluable for testing how your application responds to different scenarios, such as varying weather conditions or unexpected API failures.
# test_activity.py
async def mock_weather_service(request):
return web.json_response(
{
"properties": {
"periods": [
{"name": "Today", "temperature": 70, "shortForecast": "Sunny"},
]
}
}
)
After establishing the mock service, the next step involves setting up pytest
fixtures.
Pytest fixturesβ
These fixtures are instrumental in both initiating and tearing down your testing environment for each test.
By ensuring each test starts with a fresh environment, pytest
fixtures maintain the integrity and reliability of your entire test suite.
# test_activity.py
@pytest.fixture
async def start_fake_weather_service():
app = web.Application()
app.router.add_get("/gridpoints/{office}/{gridX},{gridY}/forecast", mock_weather_service)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
yield
await runner.cleanup()
This fixture function is used in to set up a fake weather service for testing. It creates a web application, adds a route to handle requests to the weather API endpoint, starts a TCP site, and yields control back to the test function. After the test function completes, it cleans up the resources used by the fake weather service.
Activity test caseβ
Having set up the testing environment, the focus now shifts to crafting test cases. These tests are designed to rigorously validate the behavior of your application's activities.
One key test in your suite is the test_get_weather
function. This function specifically tests the get_weather
activity in your WeatherActivities
class.
It's a critical test that ensures the activity interacts correctly with the mock weather service and processes the API responses as intended.
# test_activity.py
@pytest.mark.asyncio
@pytest.mark.usefixtures("start_fake_weather_service")
@pytest.mark.parametrize(
"input, expected_output",
[
(
WeatherParams(office="SEW", gridX=123, gridY=61),
[
{"name": "Today", "temperature": 70, "shortForecast": "Sunny"},
],
),
],
)
async def test_get_weather(input, expected_output, weather_activities):
activity_environment = ActivityEnvironment()
result = await activity_environment.run(weather_activities.get_weather, input)
assert result == expected_output
The test_get_weather
function evaluates the get_weather
method of the WeatherActivities
class, ensuring its output aligns with expected results.
This validation is made possible by employing two essential fixtures:
start_fake_weather_service: This fixture establishes a mock weather service, simulating an external weather API's behavior.
weather_activities: This fixture supplies a configured instance of the
WeatherActivities
class for testing purposes. It guarantees that the activities are examined within a stable and isolated environment, distinct from other parts of the application.
These fixtures, by setting up a controlled testing environment, enable the test_get_weather
function to effectively validate the functionality and resilience of the get_weather
method under various simulated conditions.
Execute testsβ
Finally, to run your tests, simply execute the pytest
command in your project's root directory. This command triggers the discovery and execution of all test cases defined in your test suite, and the results are displayed in your terminal. It's important to carefully review these results, as they will reveal any potential issues or regressions in your application's functionality.
By following these steps, you can thoroughly test your Temporal application, ensuring its reliability and robustness in handling Weather Forecasting Service functionalities.
Run the Temporal Applicationβ
To experience your Weather Forecasting Service in action, you'll need to run the Temporal application. This involves starting both the Worker and the Flask app, each in its own terminal. Here's how you can do it:
Start the Workerβ
First, activate the Worker. This is the component that will execute the Workflows and Activities you've defined. Open a terminal window and run the following command:
python3 run_worker.py
This command initiates the Worker, which will now listen for and execute tasks from the Temporal service.
Start the Flask Appβ
Next, launch the Flask application. The Flask app acts as the front-end interface of your service. Open a separate terminal from the one running the Worker and execute the following command:
flask run
By starting the Flask server, you're setting up the /weather
endpoint, which users can interact with to get weather forecasts.
Access the Applicationβ
Finally, explore your application. With both the Worker and Flask app running, your Temporal application is fully operational.
Open your web browser and navigate to the following URL:
This URL directs you to the /weather
endpoint of your Flask application.
Here, you can see the weather forecasts generated by your Temporal Workflows and Activities, showcasing the integration of your backend logic with the frontend interface.
Conclusionβ
In this tutorial, you've successfully integrated a web API with a Temporal Application using the Python SDK. You have learned how to set up, build, test, and enhance a Temporal Application by integrating it with a weather forecasting API. This process involved defining a Workflow, creating an Activity, configuring a Worker, and implementing a Flask-based client to interact with the Temporal backend.
By completing this tutorial, you have achieved the following goals:
- Understanding Durable Executions: You now know how to make your application's interactions with external APIs more resilient and reliable using Temporal's Workflow and Activity structures.
- Workflow and Activity Integration: You've learned how to define and orchestrate Workflows and Activities to handle external API calls effectively.
- Client-Worker Communication: You've seen how to configure a Worker to execute Workflow and Activity tasks and how a Flask client can trigger these Workflows.
This tutorial serves as a foundation for building robust, scalable applications with Temporal and Python, capable of handling complex workflows and external service integrations. The skills you've acquired here will be invaluable as you continue to explore and build distributed applications using Temporal.