Intro

In the previous async requests with SerpApi and Python blog post we've covered how to make async requests and how they work. In this continuation blog post, we'll cover how to add pagination to async requests.

📌Note: This blog post does not cover multithreading or asyncio. The topic will be covered in the next blog post.


Subject of test: YouTube Search Engine Results API.

Test includes: 50 search queries, pagination per each query, and data extraction. Sync and Async used different search queries.

Where test happend: using Replit's Boosted repls.

What hardware used: 4 vCPUs and 4 GB of RAM on Ubuntu 20.04.2 LTS.

Time Comparison

The change in total elapsed time is a whopping 434% change 😵😍

Time was recorded using $ time python <file.py>:

image

Type Sync requests pagination Async requests pagination % difference
user 135.98s 149.98s -90.67% decrease
system 20.43s 13.43s -152.12% decrease
elapsed 2h 36m 25s 36m 2s +434.09% increase

Sync Pagination

You can check the code example in the online IDE:

from serpapi import YoutubeSearch
from urllib.parse import (parse_qsl, urlsplit)
import os, json

# shortened for example
queries = [
    'tent',
    'friendly',
    'ripe',
    'helpful',
    'analyze',
    'pack'
]

data = []

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
    }

    search = YoutubeSearch(params)        # where data extraction happens

    while True:
        results = search.get_json()       # JSON -> Python dict

        if 'error' in results:
            print(results['error'])
            break

        for result in results.get('video_results', []):
            data.append({
                'title': result.get('title'),
                'link': result.get('link'),
                'channel': result.get('channel').get('name'),
            })
        
        if 'next' in results.get('serpapi_pagination', {}):
            search.params_dict.update(dict(parse_qsl(urlsplit(results.get('serpapi_pagination', {}).get('next')).query)))
        else:
            break

# print(json.dumps(data, indent=2, ensure_ascii=False))

Sync Pagination Explanation

Import libraries:

from serpapi import YoutubeSearch
from urllib.parse import (parse_qsl, urlsplit) # for pagination
import os, json

Create a list of search queries we want to search:

queries = [
    'burly',
    'silk',
    'monkey',
    'abortive',
    'hot'
]

(optional) Create a temporary list that will store extracted data:

data = []

Add a for loop to iterate over all queries, create SerpApi YouTube search parameters, and pass them to YoutubeSearch which will make a request to SerpApi. Transform the return JSON to Python dict via get_dict() method:

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict

Check for 'errors', iterate over video results and extract needed data to the temporary list.

if 'error' in results:
    print(results['error'])
    break

After that, we need to check for the 'next' key from 'serpapi_pagination' and update search parameters data to a data from the next page, or exit the infinite loop if there are no more pages left:

for result in results.get('video_results', []):
    data.append({
        'title': result.get('title'),
        'link': result.get('link'),
        'channel': result.get('channel').get('name')
    })

if 'next' in results.get('serpapi_pagination', {}):
    search.params_dict.update(dict(parse_qsl(urlsplit(results.get('serpapi_pagination', {}).get('next')).query)))
else:
    break

Async Pagination

You can check the code example in the online IDE:

from serpapi import YoutubeSearch
from urllib.parse import (parse_qsl, urlsplit)
from queue import Queue
import os, json

queries = [
    'object',
    'far-flung',
    'gabby',
    'tiresome',
    'scatter',
    'exclusive',
    'wealth'
]

search_queue = Queue()

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
        'async': True,                    # async batch requests
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict
    
    if 'error' in results:
        print(results['error'])
        break

    print(f"add search to the queue with ID: {results['search_metadata']}")
    search_queue.put(results)

data = []

while not search_queue.empty():
    result = search_queue.get()
    search_id = result['search_metadata']['id']

    print(f'Get search from archive: {search_id}')
    search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed
    
    print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

    if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
        for video_result in search_archived.get('video_results', []):
            data.append({
                'title': video_result.get('title'),
                'link': video_result.get('link'),
                'channel': video_result.get('channel').get('name'),
            })
            
        if 'next' in search_archived.get('serpapi_pagination', {}):
            search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))
            
            new_page_result = search.get_dict() # new results from updated (next) page
            search_queue.put(new_page_result)   # add to queue results from updated (next) page
    else:
        print(f'Requeue search: {search_id}')
        search_queue.put(result)
        
# print(json.dumps(data, indent=2))
print('all searches completed')

Async Pagination Explanation

Import libraries:

from serpapi import YoutubeSearch
from urllib.parse import (parse_qsl, urlsplit) # for pagination
from queue import Queue                        # for async requests 
import os, json

Create a list of search queries you want to search:

queries = [
    'object',
    'far-flung',
    'gabby',
    'tiresome',
    'scatter',
    'exclusive',
    'wealth'
]

Create a Queue that will store all requests that have been sent to SerpApi for processing:

search_queue = Queue()

Iterate over all queries, create SerpApi YouTube search parameters with 'async': True parameter present. Check for errors and put() search in the queue:

for query in queries:
    params = {
        'api_key': os.getenv('API_KEY'),  # serpapi api key
        'engine': 'youtube',              # search engine
        'device': 'desktop',              # device type
        'search_query': query,            # search query
        'async': True,                    # async batch requests
    }

    search = YoutubeSearch(params)       # where data extraction happens
    results = search.get_dict()          # JSON -> Python dict
    
    if 'error' in results:
        print(results['error'])
        break

    print(f"add search to the queue with ID: {results['search_metadata']}")
    search_queue.put(results)

Create a temporary list that will be used to store extracted data from the search archive API:

data = []

Iterate through all queue until it's empty() and get the data from search archive by accessing search ID:

while not search_queue.empty():
    result = search_queue.get()
    search_id = result['search_metadata']['id']

    print(f'Get search from archive: {search_id}')
    search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed
    
    print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

Check if the search is either cached or succeeded, if so, extract the needed data.

After extracting data we need to check if the 'next' page is present inside 'serpapi_pagination'. If it's present, we need to assign new page data to a new variable and put() it to the Queue.

If 'next' key is not present, we need to exit the pagination and proceed to requeuing result or exit while loop depending on the result 'status':

if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
        for result in search_archived.get('video_results', []):
            data.append({
                'title': result.get('title'),
                'link': result.get('link'),
                'channel': result.get('channel').get('name')
            })

        if 'next' in search_archived.get('serpapi_pagination', {}):
            search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))
            
            new_page_result = search.get_dict() # new results from updated (next) page
            search_queue.put(new_page_result)   # add to queue results from updated (next) page
    else:
        print(f'Requeue search: {search_id}')
        search_queue.put(result)
        
print(json.dumps(data, indent=2))
print('all searches completed')

Conclusion

As you so In this comparison, you saw a 430% speed increase when using Async in comparison Sync requests combined with pagination.

With that said, we strongly recommend using async parameter if there's a need to extract data with pagination.

What comes next

In the next blog post we'll cover:

  • how to speed up async requests even more.

Join us on Twitter | YouTube