Creating a Python Package with Poetry for Beginners Part 3

Intro
This it the third part of a blog series. In the previous posts we have addressed: creating a package with Poetry, managing our development environment and adding a function in part one; and package documentation, testing and how to publish to PyPI in part two.
In those previous posts, I developed a function for summarising the successes (and failures) of the teams in a fantasy football league. That function makes various API calls which in theory could all be made in parallel to speed up the runtime.
In this blog I aim to parallelise the function get_season_league which I wrote in the
first blog.
Starting Function
Here is the function written in part one:
import requests
import pandas as pd
import json
def get_season_league(league_id = "485842"):
api_url = "https://fantasy.premierleague.com/api/"
url = api_url+ "leagues-classic/" + league_id + "/standings/"
response = requests.get(url)
data = json.loads(response.text)
league = pd.DataFrame(data['standings']['results'])
df = pd.DataFrame([])
for index, row in league.iterrows():
player_query = api_url + "entry/" + str(row['entry']) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
player_df = pd.DataFrame({
'name': row['player_name'],
'team_name': row['entry_name'],
'event': pd.json_normalize(
player_data['current']
)['event'],
'points': pd.json_normalize(
player_data['current']
)['total_points']
})
df = pd.concat([df, player_df])
return df
The logic is as follows:
- Query API to get current league data
- Loop over each member of the league
- Query API for individual player
- Return relevant data
The way it is currently written is how any normal for loop will run, where the current iteration must finish before the next one starts. With this code we shouldn’t need to wait for the previous API call, there is no dependency or anything like that. In theory we could run all of the individual player queries at once and the function would be a lot faster.
Measuring function calls in Python
We can measure how long it takes to run a piece of Python code using the time package. For example measuring
my get_season_league function:
import time
from get_league import get_season_league
start_time = time.time()
x = get_season_league()
print("--- %s seconds ---" % (time.time() - start_time))
My function was taking ~3.5 seconds for the default league. Which has 13 players and there has been 11 game weeks. An average of 0.27 seconds per player (including the single original API call).
I also tested it for a larger league of 50 people and seems to take ~13 seconds but with more variance. This is a similar 0.26 seconds per player.
So this is why I want to parallelise the function, as if the non-dependent API calls could be made all at once, or at least multiple at once the function could be sped up massively. For example for the league of 50 taking the time per player at 0.26 seconds if I introduce two processes at once then it could take ~6.5 seconds, or 4 processes ~3.25. These values are approximate, but hopefully you can see the value of splitting up the parallelisable parts of the workload.
Optimising the Function
Before starting on the asynchronous side there is a few things we can address first.
iterrows() Alternative
The iterrows() function is pretty inefficient for this use case (generally as well).
This blog explains
it well and why there are better alternatives like itertuples. However I am just going to loop
over a zip of the values I need.
# Old:
for index, row in league.iterrows():
player_id = row['entry']
player_name = row['player_name']
team_name = row['entry_name']
# New:
for player_id, player_name, team_name in zip(
league['entry'],
league['player_name'],
league['entry_name']
):
Concatenating DataFrames
Another area I could improve the function is switching away from concatenating dataframes from within the for loop, towards either concatenating once at the end or creating a list of dictionaries then converting to a DataFrame at the end.
The reason for this is the way Pandas handles DataFrame memory allocation, more detail on this Saturn Cloud blog.
# Old:
df = pd.DataFrame([])
for index, row in league.iterrows():
player_query = api_url + "entry/" + str(row['entry']) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
player_df = pd.DataFrame({
'name': row['player_name'],
'team_name': row['entry_name'],
'event': pd.json_normalize(
player_data['current']
)['event'],
'points': pd.json_normalize(
player_data['current']
)['total_points']
})
df = pd.concat([df, player_df])
return df
# New:
list_to_df = []
for player_id, player_name, team_name in zip(
league["entry"], league["player_name"], league["entry_name"]
):
player_query = api_url + "entry/" + str(player_id) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
player_df = pd.DataFrame({
'name': player_name,
'team_name': team_name,
'event': pd.json_normalize(
player_data['current']
)['event'],
'points': pd.json_normalize(
player_data['current']
)['total_points']
})
list_to_df.append(player_df)
df = pd.concat(list_to_df, ignore_index=True)
return df
These changes do seem to have sped up the function by a few seconds (for the league of 50) but the bulk time is taken by the API queries so these best practices aren’t going to speed it up too much, but are worth implementing nevertheless.
Asynchronising the Code
Before I start on this section I will give a brief background on asynchronous programming but if you want more detail please read this blog.
There is two main routes we can go down here:
concurrent.futures.ThreadPoolExecutorwill use multiple threads, so the code is technically synchronous it will just be running at the same time in different use cases. This will be easier to implement with the current code however the time gains wouldn’t scale as much as the alternative. This approach will use more computational power as we’ll need additional processors.asynciowill use a single threaded multi-tasking, truly asynchronous code. The syntax is more complex and doesn’t integrate very well with my current function for example I will need to replacerequestswithaiohttp. This would definitely be the better option if I was making lots of api calls, but on a smaller scale the gains wouldn’t be as significant.
concurrent.futures.ThreadPoolExecutor
For this blog I will be going with concurrent.futures.ThreadPoolExecutor as it integrates nicely with my
existing code and the bigger gains from asyncio won’t really suit my use case.
The first thing I need to do (which could’ve been done earlier) is extract the per player logic to a separate function. This function will take a players details then use the player ID to query the API and grab the players season data. It will then nicely return it as a DataFrame.
def get_player_data(player_info, api_url):
"""Fetch data for a single player and return as DataFrame"""
player_id = player_info['entry']
player_name = player_info['player_name']
team_name = player_info['entry_name']
player_query = api_url + "entry/" + str(player_id) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
# Create DataFrame for this player
player_df = pd.DataFrame({
'name': player_name,
'team_name': team_name,
'event': pd.json_normalize(player_data['current'])['event'],
'points': pd.json_normalize(player_data['current'])['total_points']
})
return player_df
I will also need to adapt how I iterate over the player data. I know I’ve already switched from iterrows to
a for loop over a zip of the relevant data but, then new function will use a different method of iteration. So
I am creating a ‘records’ dictionary of the relevant data which I can then pass directly to my new get_player_data
function.
players = league[['entry', 'player_name', 'entry_name']].to_dict('records')
Next comes the ThreadPoolExecutor, this is what allows us to run multiple API calls at once. It allows
to create and send code to other Python threads (workers). I will first initialise an empty list to
write my player dataframes to. Then I’ll use ThreadPoolExecutor(max_workers=10) to create 10 workers
that we can send code to (I am using 10 as an example, this will be an argument the user can change in
the final function). exector is the object used to send code to the new workers, I can use executor.map
to map get_player_data over the players dictionary and save the output to our initialised list.
from concurrent.futures import ThreadPoolExecutor
def get_season_league(league_id = "485842"):
# ...
player_dfs = []
with ThreadPoolExecutor(max_workers=10) as executor:
results = executor.map(get_player_data, players)
player_dfs = list(results)
Finally we use the change mentioned above of using a single pd.concat so we only run it once rather than
n many times.
df = pd.concat(player_dfs, ignore_index=True)
So our final functions will look like this, with get_player_data defined inside get_season_league so
the api_url is available:
def get_season_league(league_id="485842", max_workers=10):
api_url = "https://fantasy.premierleague.com/api/"
url = api_url + "leagues-classic/" + league_id + "/standings/"
response = requests.get(url)
data = json.loads(response.text)
league = pd.DataFrame(data['standings']['results'])
def get_player_data(player_info):
"""Fetch data for a single player and return as DataFrame"""
player_id = player_info['entry']
player_name = player_info['player_name']
team_name = player_info['entry_name']
player_query = api_url + "entry/" + str(player_id) + "/history"
player_response = requests.get(player_query)
player_data = json.loads(player_response.text)
# Create DataFrame for this player
player_df = pd.DataFrame({
'name': player_name,
'team_name': team_name,
'event': pd.json_normalize(player_data['current'])['event'],
'points': pd.json_normalize(player_data['current'])['total_points']
})
return player_df
players = league[['entry', 'player_name', 'entry_name']].to_dict('records')
player_dfs = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(get_player_data, players)
player_dfs = list(results)
df = pd.concat(player_dfs, ignore_index=True)
return df
When I run the function on the league of 50, it now takes ~1.5 seconds rather than the original ~13 seconds.
Summary
So we’ve optimised the function to a good degree using a few adjustments to the orginial function, then using multiple
threads to run API calls at the same time. There is still some things left on the table like using asyncio
instead or even executor.submit() to have more control of the individual player queries (handling errors etc). So
perhaps in a future blog we will look at speeding the function up a little bit more.
