Jina.ai logo
Tutorial: A deep-dive with Executors-image
executor

Tutorial: A deep-dive with Executors

Susana Guzman
Susana Guzman

Last time we talked about how to create the hello world chatbot, but we didn't go much into Executors' details. Let's take a look at them now. We will create a simple logger Executor. It will log whatever info from the Documents we pass to it and save it to a file. We will also see how to push our Executor to Hub to use it later.

Set-up & overview

We recommend creating a new python virtual environment to have a clean install of Jina and prevent dependency clashing.

We can start by installing Jina:

pip install jina[standard]

For more information on installing Jina, refer to this page.

Create your Executor

To create your Executor, you just need to run this command in your terminal:

jina hub new

A wizard will ask you some questions about the Executor. For the basic configuration, you will be asked two things: The Executor's name and where it should be saved. For this tutorial, we will call ours RequestLogger. And you can save it wherever you want to have your project. The wizard will ask if you want to have a more advanced configuration, but it is unnecessary for this tutorial.

Logger Executor

Once we followed the wizard, we have our folder structure ready. We can start working with the executor.py. Open that file, and let's import the following

import os
import time
from typing import Dict

from jina import Executor, DocumentArray, requests
from jina.logging.logger import JinaLogger

Then we create our class that inherits from the Executor base class. We will call ours RequestLogger

class `RequestLogger`(Executor):    

Our Executor will have two methods. One for the constructor and one for the actual logging:

class RequestLogger(Executor):    
    def __init__(self, **args, **kwargs):
        #Whatever you need for our constructor

    def log():
        #Whatever we need for our logging

It could be helpful to specify the number of Documents we want to work with, so we pass this directly in the arguments of our constructor

de __init__(self,
                default_log_docs: int = 1,      # here you can pass whatever other arguments you need                                                    
                *args, **kwargs):     

Ok, now we start creating our constructor method. First thing you need to do before any custom logic is this:

super().__init__(*args, **kwargs)

We now set the default_log_docs we got from the arguments:

self.default_log_docs = default_log_docs

For logging, we need to create an instance for the JinaLogger. And we also need to specify the path where to save our file.

self.logger = JinaLogger('req_logger')
self.log_path = os.path.join(self.workspace, 'log.txt')

And finally, we need to check the path doesn't exist already

if not os.path.exists(self.log_path):
    with open(self.log_path, 'w'): pass

Ok, that's it for our constructor, by now we should have something like this:


class RequestLogger(Executor):                                                                      # needs to inherit from Executor
    def __init__(self,
                default_log_docs: int = 1,                                                          # number of documents to log
                *args, **kwargs):                                                                   # *args and **kwargs are required for Executor
        super().__init__(*args, **kwargs)                                                           # before any custom logic
        self.default_log_docs = default_log_docs
        self.logger = JinaLogger('req_logger')                                                      # create instance of JinaLogger
        self.log_path = os.path.join(self.workspace, 'log.txt')                                     # set path to save the log.txt
        if not os.path.exists(self.log_path):                                                       # check the file doesn't exist already
            with open(self.log_path, 'w'): pass


We can start creating our log method now. First of all, we need the @requests decorator. This is to communicate to the Flow when the function will be called and to which endpoint. We use @requests without any endpoint, so we will call our function on every request:

@requests                                                                                       
    def log(self,                                                                                   
            docs: Optional[DocumentArray],
            parameters: Dict,
            **kwargs):

It's important to note the arguments here. It's not possible to redefine the interface of the public methods decorated by @requests. You can't change the name of these arguments. To see exactly which parameters you can use, check our cookbook about Executors. If you would like to call your log function only on index time, you specify the endpoint with on=, like this:

@requests(on='/index')                                                                                      
    def log(self,                                                                           
            Optional[DocumentArray],
            parameters: Dict,
            **kwargs):

If you want more information on how to use this decorator, you could check our cookobook. In this example, we want to call our log function on every request, so we don't specify any endpoint.

Now we can add the logic for our function. First, we will print a line that displays some information. And then, we will save the details from our Documents:

self.logger.info('Request being processed...')

nr_docs = int(parameters.get('log_docs', self.default_log_docs))                            # accesing parameters (nr are passed as float due to Protobuf)
        with open(self.log_path, 'a') as f:
            f.write(f'request at time {time.time()} with {len(docs)} documents:\n')
            for i, doc in enumerate(docs):
                f.write(f'\tsearching with doc.id {doc.id}. content = {doc.content}\n')
                if i + 1 == nr_docs:
                    break

Here you can set whatever logic you need for your Executor. By now, your code should look like this:

import os
import time
from typing import Dict

from jina import Executor, DocumentArray, requests
from jina.logging.logger import JinaLogger


class `RequestLogger`(Executor):                                                                      # needs to inherit from Executor
    def __init__(self,
                default_log_docs: int = 1,                                                          # your arguments
                *args, **kwargs):                                                                   # *args and **kwargs are required for Executor
        super().__init__(*args, **kwargs)                                                           # before any custom logic
        self.default_log_docs = default_log_docs
        self.logger = JinaLogger('req_logger')
        self.log_path = os.path.join(self.workspace, 'log.txt')
        if not os.path.exists(self.log_path):
            with open(self.log_path, 'w'): pass

    @requests                                                                                       # decorate, by default it will be called on every request
    def log(self,                                                                                   # arguments are automatically received
            Optional[DocumentArray],
            parameters: Dict,
            **kwargs):
        self.logger.info('Request being processed...')

        nr_docs = int(parameters.get('log_docs', self.default_log_docs))                            # accesing parameters (nr are passed as float due to Protobuf)
        with open(self.log_path, 'a') as f:
            f.write(f'request at time {time.time()} with {len(docs)} documents:\n')
            for i, doc in enumerate(docs):
                f.write(f'\tsearching with doc.id {doc.id}. content = {doc.content}\n')
                if i + 1 == nr_docs:
                    break

And that's it. We have an Executor that takes whatever Documents we pass to it and logs them.

Ok, and what now? How can you use this in your app?

Push your Executor to Hub

We could use our Executor directly in our app, but here we will see how to push it to Jina Hub so we can share it with more people, or use it later. For this, you need to open a terminal in the folder of your executor.py, so in this case, open a terminal inside the RequestLogger folder. And there you just need to type:

jina hub push --public .

This means you will push your Executor publicly to Jina Hub. The last dot means you will use your current path. Once you run that command, you should see something like this:

image

Since we pushed our Executor using the --public flag, the only thing we will use is the ID, which is zsor7fe6 in this case.

So now we have our Executor pushed to Jina Hub, and we can use it via the ID. Let's see how to do that.

Use your Executor

Let's now create some Flows that can use the Executor we just made. Create an app.py in the same folder as RequestLogger. Now open it and import Flow, DocumentArray, Document before we create our `main function:

from jina import Flow, DocumentArray, Document

def main():
    # We'll have our Flows here

if __name__ == '__main__':
    main()

The Executor we just created logs whatever Documents we pass to it. So we need to create some Documents first. We'll do that on the main()

def main():
    docs = DocumentArray()
    docs.append(Document(content='I love cats'))                # creating documents
    docs.append(Document(content='I love every type of cat'))
    docs.append(Document(content='I guess dogs are ok'))

We have three Documents in one DocumentArray. Now let's create a Flow and add the Executor we created. We will reference it by the ID we got when we pushed it, in my case, it was zsor7fe6, but you change this for the ID you got:


flow = Flow().add(                                              
        uses='jinahub+docker://zsor7fe6',
        uses_with={                                         # RequestLogger arguments
            'default_log_docs': 3
        },
        volumes='workspace:/internal_workspace',                # mapping local folders to docker instance folders
        uses_metas={                                        # Executor (parent class) arguments
            'workspace': '/internal_workspace',                 # this should match the above
        },
    )

This seems like plenty of details but let's see them one by one.

uses='jinahub+docker://zsor7fe6',

Here you use uses= to specify the image of your Executor. This will start a Docker container with the image of the Executor we built and deployed in the previous step. So don't forget to change the ID to the correct one.

uses_with={                                         # RequestLogger arguments
            'default_log_docs': 3
        },

We need uses_with= to pass the arguments we need. In our case, we have only one argument: default_log_docs. In the constructor of our RequestLogger Executor, we defined the default_log_docs as 1, but we override it here with 3, so 3 will be the new value.

The next line refers to our workspace:

volumes='workspace:/internal_workspace',

Here we are mapping the workspace folder that will be created when we run our app to a folder called internal_workspace in Docker. We do this because our Executor logs the Documents into a file, and we want to save that file on our local disk. If we don't do that, the information would be saved in the Docker container, and you would need to access that container to see files. To do this, we use volumes= and set it to our internal workspace.

The last part overrides arguments too, but this time for the Executor's base class:

uses_metas={                                                # Executor (parent class) arguments
            'workspace': '/internal_workspace',                 # this should match the above
        },

In our case, the only argument we want to override is the name of the workspace. If you don't do this, a folder with the same name of your Executor class (RequestLogger) would be created, and your information would have been saved there. But since we just mounted our workspace with the name internal_workspace in Docker, we need to make a folder with that same name.

Ok, we have our Flow ready with the Executor we deployed previously. We can use it now. Let's start by indexing the Documents:

with flow as f:                                                 # Flow is a context manager
        f.post(
            on='/index',                                        # the endpoint
            inputs=docs,                                        # the documents we send as input
        )

The Executor we created doesn't care about what endpoint is used, so it will perform the same operation no matter what endpoint you specify here. In this example, we set it to on='/index' anyway. Here you could use one for index and another one for query if you need it and your Executor has the proper endpoints.

So far, your code should look like this:

from jina import Flow, DocumentArray, Document


def main():
    docs = DocumentArray()
    docs.append(Document(content='I love cats'))                # creating documents
    docs.append(Document(content='I love every type of cat'))
    docs.append(Document(content='I guess dogs are ok'))

    flow = Flow().add(                                          # provide as class name or jinahub+docker URI
        uses='jinahub+docker://7dne55rj',
        uses_with={                                         # RequestLogger arguments
            'default_log_docs': 3
        },
        volumes='workspace:/internal_workspace',                # mapping local folders to docker instance folders
        uses_metas={                                        # Executor (parent class) arguments
            'workspace': '/internal_workspace',                 # this should match the above
        },
    )

    with flow as f:                                             # Flow is a context manager
        f.post(
            on='/index',                                        # the endpoint
            inputs=docs,                                        # the documents we send as input
        )


if __name__ == '__main__':
    main()

And if you run this, you will see a new workspace folder created with two other folders inside. One called RequestLogger or whatever name you used in your class. And another folder for the sharding, but we won't talk about that in this tutorial because it's out of scope. Inside the sharding folder called 0 in this case, you will see a log.txt file. And there you will have the 3 Documents with their information.

image

And that's it! You created an Executor, pushed it to Hub, and used it in your app.

There are still a lot of concepts to learn. So stay tuned for our following tutorials.

If you have any issues following this tutorial, you can always get support from our Slack community

Community

  • Slack community - a communication platform for developers to discuss Jina.
  • LinkedIn - get to know Jina AI as a company and find job opportunities.
  • Twitter - follow us and interact with us using hashtag #JinaSearch.
  • Company - know more about our company, we are fully committed to open-source!

License

Copyright (c) 2021 Jina AI Limited. All rights reserved.

Jina is licensed under the Apache License, Version 2.0. See LICENSE for the full license text.

© 2021 Jina AI GmbH. All rights reserved.Terms of Service|Privacy Policy