Creating N1QL Labelled Image Database using Couchbase, FastAPI, and SerpApi

This is a part of the series of blog posts related to Artificial Intelligence Implementation. If you are interested in the background of the story or how it goes:

On the previous weeks we explored how to create your own image dataset using SerpApi's Google Images Scraper API automatically, and used those images to automatically train a network using simple command object to be passed to FastAPI. This week we will improve the database creation method by using Couchbase as a Storage Server, and will show how to fetch a random element from a given subset.

Couchbase Configuration

We will be needing Couchbase Community server, and Python SDK for Couchbase for this project. You can find relevant information on Couchbase Docs.

For this tutorial we will be using CouchBase version 7.1.0 for Debian 11:

You can access it from this link

Once you install it, define username and password from the server destination (http://kagermanov:8091 in my case), you will be greeted with such dashboard:

This means you have successfully deployed the server.
For those of you who want to stop the background process on Linux, you can type sudo systemctl stop couchbase-server to stop the server at your will.

Head to Buckets on the left hand menu and add a new bucket called images from ADD BUCKET button:

Make sure to choose a ram amount that won't force your local system into frenzy.

Now, You need to add a scope and collection within this bucket via Scopes & Collections button:

Add a scope named image, and within it, a collection named labelled_image:

Next, head to playground where you can do a manual query, and run the following:

CREATE PRIMARY INDEX ON `images`;

Lastly, make sure you install the Couchbase Python SDK via pip, and everything is set for our server.

Automatic Image Collector

Let's create a seperate file within our project called add_couchbase.py. This will be the refactored version of add.py which was automatically gathering images with a certain query.
Here are the requirements for it:

from couchbase.options import (ClusterOptions, ClusterTimeoutOptions, QueryOptions)
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from serpapi import GoogleSearch
from datetime import timedelta
from pydantic import BaseModel
import mimetypes
import requests
import random
import uuid
import urllib
import base64

To break them down for their specific usecase:

Requirement Explanation
couchbase.options.ClusterOptions For defining timeout and authentication options of storage cluster connection
couchbase.options.ClusterTimeoutOptions For maximum time allowed to perform an operation in the cluster
couchbase.options.QueryOptions For giving positional parameters within a N1QL query to Couchbase Server
couchbase.auth.PasswordAuthenticator For authenticating connection with the storage server
couchbase.cluster.Cluster For the connection to the storage server
serpapi.GoogleSearch For Scraping links of images from Google Images
datetime.timedelta For defining a time interval
pydantic.BaseModel For defining objects to be passed as parameters
mimetypes For guessing the extension of the image
requests For making GET requests to image links
random For creating a random number within an interval to fetch random image from storage database
uuid For creating a unique identifier for each image
base64 For converting image byte data to base64 for storing purposes

Let's define a pydantic base model for an image to be uploaded to the Couchbase Storage Server:

class Document(BaseModel):
	type: str = "image"
	id: str
	classification: str
	base64: str
	uri: str

id will be a unique uuid of the image for image to be called manually in the future.
classification is the query given to SerpApi's Engine.
base64 will be the the string representation of the image to be recreated within a training session.
uri will represent the url we fetch the image from.

Let's initialize our Storage Database in a class:

class ImagesDataBase:
	def __init__(self):
		username = "<Your Couchbase Username>"
		password = "<Your Couchbase Password>"
		bucket_name = "images"
		auth = PasswordAuthenticator(
				username,
				password
		)
		timeout_opts = ClusterTimeoutOptions(kv_timeout=timedelta(seconds=10))
		self.cluster = Cluster('couchbase://localhost', ClusterOptions(auth, timeout_options=timeout_opts))
		self.cluster.wait_until_ready(timedelta(seconds=5))
		cb = self.cluster.bucket(bucket_name)
		self.cb_coll = cb.scope("image").collection("labelled_image")

Here's the function for inserting an image to the Storage server with a unique id:

	def insert_document(self, doc: Document):
		doc = doc.dict()
		print("\nInsert CAS: ")
		try:
			key = doc["type"] + "_" + str(doc["id"])
			result = self.cb_coll.insert(key, doc)
			print(result.cas)
		except Exception as e:
			print(e)

doc in this context represents the Document object we store the image inside to be uploaded to the Couchbase Server.

Let's have another function to call an image by its unique key we will generate. This function will not be used in the context of this blog post.

	def get_image_by_key(self, key):
		print("\nGet Result: ")
		try:
			result = self.cb_coll.get(key)
			print(result.content_as[str])
		except Exception as e:
			print(e)

Next, we need to build a function in which helps us upload only the unique images. The differentiator will be the unique link within the scope of a classsification:

	def check_if_it_exists(self, link, cs):
		try:
			sql_query = 'SELECT uri FROM `images`.image.labelled_image WHERE classification = $1 AND uri = $2'
			row_iter = self.cluster.query(
				sql_query,
				QueryOptions(positional_parameters=[cs, link]))
			for row in row_iter:
				return row
		except Exception as e:
			print(e)

This function takes link, which is the link to the image, and cs, which is the classifier of the image. If an image with the same link does not already exist within our storage, it returns None. The reason we don't query the entire database for the uniqueness is simple. First, it wouldn't be efficient in the long run. Second, same images could have different classifications. Imagine the logo of Apple, the company. It is also Apple, the fruit. If we are classifying between Apple Logo and Blackberry Logo, and if the image is in Apple classification only, there is a chance that the model could fail to interpret. This approach might create unnoticable duplicate images with different classifications But in the long run it would prove useful.
Here's an example of the following manual query that we know already exists in the Couchbase Server:

SELECT uri FROM `images`.image.labelled_image WHERE classification = 'Pomegrenate' AND uri = 'https://i0.wp.com/post.healthline.com/wp-content/uploads/2022/02/pomegranate-seeds-fruit-1296x728-header.jpg?w=1155&h=1528'

Now that we have the uniqueness out of the way, let us focus on randomness of a given query. This part will also not be used in this blog post, but for future purposes. The function will give the number of images in a subset of classifications. It is useful for determining a random number within range of maximum number size.

	def get_max_image_size(self, cs):
		try:
			sql_query = 'SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = $1'
			row_iter = self.cluster.query(
				sql_query,
				QueryOptions(positional_parameters=[cs]))
			for row in row_iter:
				return row
		except Exception as e:
			print(e)

Here's another example query for the size of Orange Images in the storage server:

SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = 'Orange'

Now, let's define a function that randomly picks a number for us. For this we will define a random integer outside the scope of the query we will feed. But we will define this random integer with the previous function we constructed:

	def random_lookup_by_classification(self, cs):
		max_size = self.get_max_image_size(cs)['max_size']
		random_number = random.randint(0,max_size - 1)
		print("\nLookup Result: ")
		try:
			sql_query = 'SELECT (SELECT im.base64 FROM `images`.image.labelled_image AS im WHERE im.classification = $1)[$2]'
			row_iter = self.cluster.query(
				sql_query,
				QueryOptions(positional_parameters=[cs, random_number]))
			for row in row_iter:
				return row
		except Exception as e:
			print(e)

Here is an example query with the random number 37, which is between 0 and 103(from 104 images of Orange):

SELECT (SELECT im.base64 FROM `images`.image.labelled_image AS im WHERE im.classification = 'Orange')[37]

We have eveything we need for this week and the coming week's blog post now. Let's redefine what we have already defined. A pydantic model for the Query object we pass to the endpoint:

class Query(BaseModel):
		google_domain: str = "google.com"
		num: str = "100"
		ijn: str = "0"
		q: str
		api_key: str ## You may replace this with `api_key: str = "Your API Key"`

Again, the API key mentioned here is your unique API key for SerpApi. It can be accessed via the Api Key page.

Here's the redefinition of the Download Class. We can omit some parts to keep uniqueness, and add new ones like database object.

class Download:
	def __init__(self, query: Query, db: ImagesDataBase):
		self.db = db
		self.query = query
		self.results = []

There is no change in the function of SerpApi's Google Images API implementation. However, let me restate one amazing fact again. If the query you are searching is cached, you can get it free of charge.

	def serpapi_search(self):
		params = {
			"engine": "google",
			"ijn": self.query.ijn,
			"q": self.query.q,
			"google_domain": self.query.google_domain,
			"tbm": "isch",
			"num": self.query.num,
			"api_key": self.query.api_key
		}

		search = GoogleSearch(params)
		results = search.get_dict()
		results = results['images_results']
		self.results = results = [x['original'] for x in results]

Let's define another function for downloading an image and returning it as a Document object:

	def get_document(self, link):
		print("Downloading {}".format(link))
		classification = self.query.q
		r = requests.get(link)
		base64_str = base64.b64encode(r.content).decode('ascii')
		extension = mimetypes.guess_extension(r.headers.get('content-type', '').split(';')[0])
		id = uuid.uuid1().hex
		if extension == ".jpg" or extension == ".jpeg" or extension == ".png":
			doc = Document(id = id, classification = classification, base64 = base64_str, uri = link )
			return doc
		else:
			return None

Next, we define the function to insert the Document objects we get from the previous function. We check for the uniqueness of the link to reduce duplicates in this function also:

	def move_to_db(self, link):
		doc = self.get_document(link)
		sameness = self.db.check_if_it_exists(self.query.q, link)
		if doc is not None and sameness is None:
			self.db.insert_document(doc=doc)

Here, we can iterate through all the links gathered from SerpApi's Google Images Scraper API, and upload them to our Couchbase Storage Server:

	def move_all_images_to_db(self):
		self.serpapi_search()
		for result in self.results:
			try: 
				self.move_to_db(result)
			except:
				"\n Passed image"

Now that we have everything in place, let us define the add_couchbase.py function within our main.py:

from fastapi import FastAPI
from add_couchbase import Download, Query, ImagesDataBase
from create import CSVCreator, ClassificationsArray
from dataset import CustomImageDataLoader, CustomImageDataset
from train import CNN, Train
from commands import TrainCommands

app = FastAPI()

@app.get("/")
def read_root():
  return {"Hello": "World"}

@app.post("/add_to_db/")
def create_query(query: Query):
  db = ImagesDataBase()
  serpapi = Download(query, db)
  serpapi.serpapi_search()
  serpapi.move_all_images_to_db()
  return  {"status": "Complete"}

...

Collecting Images and Storing Them with Classifications

Let's put everything we made into practice. Run the server with the following command:

uvicorn main:app --host 0.0.0.0 --port 8000

and then head to localhost:8000/docs to try out /add_to_db/ endpoint with the following request body:

{
  "google_domain": "google.com",
  "num": "100",
  "ijn": "0",
  "q": "string",
  "api_key": "<Your API Key>"
}

If you observe the terminal, you will see that the process of updating the database is happening in real time:

...
Insert CAS: 
1655331991397793792
Downloading https://target.scene7.com/is/image/Target/GUEST_c3800365-97f3-4fe9-8061-8894a378cc85?wid=488&hei=488&fmt=pjpeg

Insert CAS: 
1655331991890821120
Downloading https://solidstarts.com/wp-content/uploads/Mango_edited-scaled.jpg

Insert CAS: 
1655331993304432640
Downloading https://www.netmeds.com/images/cms/wysiwyg/blog/2019/04/Raw_Mango_898.jpg
...

If we query the database even before it finished, we can see that the entries with the classification label Mango are being updated. Here's the command for it:

SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = 'Mango'


It already added 67 unique images to the database we can use to train our network in the coming weeks.

Conclusion

N1QL Databases such as Couchbase have fast response times compared to other Storage Databases. In this regard, I thought refactoring this part as an essential step before taking any further actions. This implementation will provide us with the speed and scalability we hope to support us in the coming week's challanges in comparing different approaches in Image Classification. It is also important for async handling of some functions such as inserting Images instead of naming them using the OS.