This is a part of the series of blog posts related to Artificial Intelligence Implementation. If you are interested in the background of the story or how it goes:
On the previous weeks we explored how to create your own image dataset using SerpApi's Google Images Scraper API automatically, and used those images to automatically train a network using simple command object to be passed to FastAPI. This week we will improve the database creation method by using Couchbase as a Storage Server, and will show how to fetch a random element from a given subset.
Couchbase Configuration
We will be needing Couchbase Community server, and Python SDK for Couchbase for this project. You can find relevant information on Couchbase Docs.
For this tutorial we will be using CouchBase version 7.1.0 for Debian 11:
You can access it from this link
Once you install it, define username and password from the server destination (http://kagermanov:8091
in my case), you will be greeted with such dashboard:
This means you have successfully deployed the server.
For those of you who want to stop the background process on Linux, you can type sudo systemctl stop couchbase-server
to stop the server at your will.
Head to Buckets
on the left hand menu and add a new bucket called images
from ADD BUCKET
button:
Make sure to choose a ram amount that won't force your local system into frenzy.
Now, You need to add a scope and collection within this bucket via Scopes & Collections
button:
Add a scope named image
, and within it, a collection named labelled_image
:
Next, head to playground where you can do a manual query, and run the following:
CREATE PRIMARY INDEX ON `images`;
Lastly, make sure you install the Couchbase Python SDK via pip, and everything is set for our server.
Automatic Image Collector
Let's create a seperate file within our project called add_couchbase.py
. This will be the refactored version of add.py
which was automatically gathering images with a certain query.
Here are the requirements for it:
from couchbase.options import (ClusterOptions, ClusterTimeoutOptions, QueryOptions)
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from serpapi import GoogleSearch
from datetime import timedelta
from pydantic import BaseModel
import mimetypes
import requests
import random
import uuid
import urllib
import base64
To break them down for their specific usecase:
Requirement | Explanation |
---|---|
couchbase.options.ClusterOptions | For defining timeout and authentication options of storage cluster connection |
couchbase.options.ClusterTimeoutOptions | For maximum time allowed to perform an operation in the cluster |
couchbase.options.QueryOptions | For giving positional parameters within a N1QL query to Couchbase Server |
couchbase.auth.PasswordAuthenticator | For authenticating connection with the storage server |
couchbase.cluster.Cluster | For the connection to the storage server |
serpapi.GoogleSearch | For Scraping links of images from Google Images |
datetime.timedelta | For defining a time interval |
pydantic.BaseModel | For defining objects to be passed as parameters |
mimetypes | For guessing the extension of the image |
requests | For making GET requests to image links |
random | For creating a random number within an interval to fetch random image from storage database |
uuid | For creating a unique identifier for each image |
base64 | For converting image byte data to base64 for storing purposes |
Let's define a pydantic base model for an image to be uploaded to the Couchbase Storage Server:
class Document(BaseModel):
type: str = "image"
id: str
classification: str
base64: str
uri: str
id
will be a unique uuid of the image for image to be called manually in the future.
classification
is the query given to SerpApi's Engine.
base64
will be the the string representation of the image to be recreated within a training session.
uri
will represent the url we fetch the image from.
Let's initialize our Storage Database in a class:
class ImagesDataBase:
def __init__(self):
username = "<Your Couchbase Username>"
password = "<Your Couchbase Password>"
bucket_name = "images"
auth = PasswordAuthenticator(
username,
password
)
timeout_opts = ClusterTimeoutOptions(kv_timeout=timedelta(seconds=10))
self.cluster = Cluster('couchbase://localhost', ClusterOptions(auth, timeout_options=timeout_opts))
self.cluster.wait_until_ready(timedelta(seconds=5))
cb = self.cluster.bucket(bucket_name)
self.cb_coll = cb.scope("image").collection("labelled_image")
Here's the function for inserting an image to the Storage server with a unique id:
def insert_document(self, doc: Document):
doc = doc.dict()
print("\nInsert CAS: ")
try:
key = doc["type"] + "_" + str(doc["id"])
result = self.cb_coll.insert(key, doc)
print(result.cas)
except Exception as e:
print(e)
doc
in this context represents the Document object we store the image inside to be uploaded to the Couchbase Server.
Let's have another function to call an image by its unique key we will generate. This function will not be used in the context of this blog post.
def get_image_by_key(self, key):
print("\nGet Result: ")
try:
result = self.cb_coll.get(key)
print(result.content_as[str])
except Exception as e:
print(e)
Next, we need to build a function in which helps us upload only the unique images. The differentiator will be the unique link within the scope of a classsification:
def check_if_it_exists(self, link, cs):
try:
sql_query = 'SELECT uri FROM `images`.image.labelled_image WHERE classification = $1 AND uri = $2'
row_iter = self.cluster.query(
sql_query,
QueryOptions(positional_parameters=[cs, link]))
for row in row_iter:
return row
except Exception as e:
print(e)
This function takes link
, which is the link to the image, and cs
, which is the classifier of the image. If an image with the same link does not already exist within our storage, it returns None
. The reason we don't query the entire database for the uniqueness is simple. First, it wouldn't be efficient in the long run. Second, same images could have different classifications. Imagine the logo of Apple, the company
. It is also Apple, the fruit
. If we are classifying between Apple Logo
and Blackberry Logo
, and if the image is in Apple
classification only, there is a chance that the model could fail to interpret. This approach might create unnoticable duplicate images with different classifications But in the long run it would prove useful.
Here's an example of the following manual query that we know already exists in the Couchbase Server:
SELECT uri FROM `images`.image.labelled_image WHERE classification = 'Pomegrenate' AND uri = 'https://i0.wp.com/post.healthline.com/wp-content/uploads/2022/02/pomegranate-seeds-fruit-1296x728-header.jpg?w=1155&h=1528'
Now that we have the uniqueness out of the way, let us focus on randomness of a given query. This part will also not be used in this blog post, but for future purposes. The function will give the number of images in a subset of classifications. It is useful for determining a random number within range of maximum number size.
def get_max_image_size(self, cs):
try:
sql_query = 'SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = $1'
row_iter = self.cluster.query(
sql_query,
QueryOptions(positional_parameters=[cs]))
for row in row_iter:
return row
except Exception as e:
print(e)
Here's another example query for the size of Orange Images in the storage server:
SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = 'Orange'
Now, let's define a function that randomly picks a number for us. For this we will define a random integer outside the scope of the query we will feed. But we will define this random integer with the previous function we constructed:
def random_lookup_by_classification(self, cs):
max_size = self.get_max_image_size(cs)['max_size']
random_number = random.randint(0,max_size - 1)
print("\nLookup Result: ")
try:
sql_query = 'SELECT (SELECT im.base64 FROM `images`.image.labelled_image AS im WHERE im.classification = $1)[$2]'
row_iter = self.cluster.query(
sql_query,
QueryOptions(positional_parameters=[cs, random_number]))
for row in row_iter:
return row
except Exception as e:
print(e)
Here is an example query with the random number 37, which is between 0 and 103(from 104 images of Orange):
SELECT (SELECT im.base64 FROM `images`.image.labelled_image AS im WHERE im.classification = 'Orange')[37]
We have eveything we need for this week and the coming week's blog post now. Let's redefine what we have already defined. A pydantic model for the Query
object we pass to the endpoint:
class Query(BaseModel):
google_domain: str = "google.com"
num: str = "100"
ijn: str = "0"
q: str
api_key: str ## You may replace this with `api_key: str = "Your API Key"`
Again, the API key mentioned here is your unique API key for SerpApi. It can be accessed via the Api Key page.
Here's the redefinition of the Download
Class. We can omit some parts to keep uniqueness, and add new ones like database object.
class Download:
def __init__(self, query: Query, db: ImagesDataBase):
self.db = db
self.query = query
self.results = []
There is no change in the function of SerpApi's Google Images API implementation. However, let me restate one amazing fact again. If the query you are searching is cached, you can get it free of charge.
def serpapi_search(self):
params = {
"engine": "google",
"ijn": self.query.ijn,
"q": self.query.q,
"google_domain": self.query.google_domain,
"tbm": "isch",
"num": self.query.num,
"api_key": self.query.api_key
}
search = GoogleSearch(params)
results = search.get_dict()
results = results['images_results']
self.results = results = [x['original'] for x in results]
Let's define another function for downloading an image and returning it as a Document object:
def get_document(self, link):
print("Downloading {}".format(link))
classification = self.query.q
r = requests.get(link)
base64_str = base64.b64encode(r.content).decode('ascii')
extension = mimetypes.guess_extension(r.headers.get('content-type', '').split(';')[0])
id = uuid.uuid1().hex
if extension == ".jpg" or extension == ".jpeg" or extension == ".png":
doc = Document(id = id, classification = classification, base64 = base64_str, uri = link )
return doc
else:
return None
Next, we define the function to insert the Document objects we get from the previous function. We check for the uniqueness of the link to reduce duplicates in this function also:
def move_to_db(self, link):
doc = self.get_document(link)
sameness = self.db.check_if_it_exists(self.query.q, link)
if doc is not None and sameness is None:
self.db.insert_document(doc=doc)
Here, we can iterate through all the links gathered from SerpApi's Google Images Scraper API, and upload them to our Couchbase Storage Server:
def move_all_images_to_db(self):
self.serpapi_search()
for result in self.results:
try:
self.move_to_db(result)
except:
"\n Passed image"
Now that we have everything in place, let us define the add_couchbase.py
function within our main.py
:
from fastapi import FastAPI
from add_couchbase import Download, Query, ImagesDataBase
from create import CSVCreator, ClassificationsArray
from dataset import CustomImageDataLoader, CustomImageDataset
from train import CNN, Train
from commands import TrainCommands
app = FastAPI()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.post("/add_to_db/")
def create_query(query: Query):
db = ImagesDataBase()
serpapi = Download(query, db)
serpapi.serpapi_search()
serpapi.move_all_images_to_db()
return {"status": "Complete"}
...
Collecting Images and Storing Them with Classifications
Let's put everything we made into practice. Run the server with the following command:
uvicorn main:app --host 0.0.0.0 --port 8000
and then head to localhost:8000/docs
to try out /add_to_db/
endpoint with the following request body:
{
"google_domain": "google.com",
"num": "100",
"ijn": "0",
"q": "string",
"api_key": "<Your API Key>"
}
If you observe the terminal, you will see that the process of updating the database is happening in real time:
...
Insert CAS:
1655331991397793792
Downloading https://target.scene7.com/is/image/Target/GUEST_c3800365-97f3-4fe9-8061-8894a378cc85?wid=488&hei=488&fmt=pjpeg
Insert CAS:
1655331991890821120
Downloading https://solidstarts.com/wp-content/uploads/Mango_edited-scaled.jpg
Insert CAS:
1655331993304432640
Downloading https://www.netmeds.com/images/cms/wysiwyg/blog/2019/04/Raw_Mango_898.jpg
...
If we query the database even before it finished, we can see that the entries with the classification label Mango
are being updated. Here's the command for it:
SELECT COUNT(*) as max_items FROM `images`.image.labelled_image WHERE classification = 'Mango'
It already added 67 unique images to the database we can use to train our network in the coming weeks.
Conclusion
N1QL Databases such as Couchbase have fast response times compared to other Storage Databases. In this regard, I thought refactoring this part as an essential step before taking any further actions. This implementation will provide us with the speed and scalability we hope to support us in the coming week's challanges in comparing different approaches in Image Classification. It is also important for async handling of some functions such as inserting Images instead of naming them using the OS.