Add Endpoints#
Methods decorated with @requests
are mapped to network endpoints while serving.
Decorator#
Executor methods decorated with requests
are bound to specific network requests, and respond to network queries.
Both def
or async def
methods can be decorated with requests
.
You can import the @requests
decorator via:
from jina import requests
requests
takes an optional on=
parameter, which binds the decorated method to the specified route:
from jina import Executor, requests
import asyncio
class RequestExecutor(Executor):
@requests(
on=['/index', '/search']
) # foo is bound to `/index` and `/search` endpoints
def foo(self, **kwargs):
print(f'Calling foo')
@requests(on='/other') # bar is bound to `/other` endpoint
async def bar(self, **kwargs):
await asyncio.sleep(1.0)
print(f'Calling bar')
Run the example:
from jina import Deployment
dep = Deployment(uses=RequestExecutor)
with dep:
dep.post(on='/index', inputs=[])
dep.post(on='/other', inputs=[])
dep.post(on='/search', inputs=[])
─────────────────────── 🎉 Deployment is ready to serve! ───────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:59525 │
│ 🔒 Private 192.168.1.13:59525 │
│ 🌍 Public 197.244.143.223:59525 │
╰──────────────────────────────────────────╯
Calling foo
Calling bar
Calling foo
Default binding#
A class method decorated with plain @requests
(without on=
) is the default handler for all endpoints.
This means it is the fallback handler for endpoints that are not found. c.post(on='/blah', ...)
invokes MyExecutor.foo
.
from jina import Executor, requests
import asyncio
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
print(kwargs)
@requests(on='/index')
async def bar(self, **kwargs):
await asyncio.sleep(1.0)
print(f'Calling bar')
No binding#
If a class has no @requests
decorator, the request simply passes through without any processing.
Document type binding#
When using docarray>=0.30
, each endpoint bound by the request endpoints can have different input and output Document types. One can specify these types by adding
type annotations to the decorated methods or by using the request_schema
and response_schema
argument. The design is inspired by FastAPI.
These schemas have to be Documents inheriting from BaseDoc
or a parametrized DocList
. You can see the differences when using single Documents or a DocList for serving in the Executor API section.
from jina import Executor, requests
from docarray import DocList, BaseDoc
from docarray.typing import AnyTensor
from typing import Optional
import asyncio
class BarInputDoc(BaseDoc):
text: str = ''
class BarOutputDoc(BaseDoc):
text: str = ''
embedding: Optional[AnyTensor] = None
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
print(kwargs)
@requests(on='/index')
async def bar(self, docs: DocList[BarInputDoc], **kwargs) -> DocList[BarOutputDoc]:
print(f'Calling bar')
await asyncio.sleep(1.0)
ret = DocList[BarOutputDoc]()
for doc in docs:
ret.append(BarOutputDoc(text=doc.text, embedding = embed(doc.text))
return ret
Note that the type hint is actually more that just a hint – the Executor uses it to infer the actual schema of the endpoint.
You can also explicitly define the schema of the endpoint by using the request_schema
and
response_schema
parameters of the requests
decorator:
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
print(kwargs)
@requests(on='/index', request_schema=DocList[BarInputDoc], response_schema=DocList[BarOutputDoc])
async def bar(self, docs, **kwargs):
print(f'Calling bar')
await asyncio.sleep(1.0)
ret = DocList[BarOutputDoc]()
for doc in docs:
ret.append(BarOutputDoc(text=doc.text, embedding = embed(doc.text))
return ret
If there is no request_schema
and response_schema
, the type hint is used to infer the schema. If both exist, request_schema
and response_schema
will be used.
Note
When no type annotation or argument is provided, Jina assumes that LegacyDocument is the type used.
This is intended to ease the transition from using Jina with docarray<0.30.0
to using it with the newer versions.
Executor API#
Methods decorated by @requests
require an API for Jina to serve them with a Deployment
or Flow
.
An Executor’s job is to process Documents
that are sent via the network. Executors can work on these Documents
one by one or in batches.
This behavior is determined by an argument:
doc
if you want your Executor to work on one Document at a time, ordocs
if you want to work on batches of Documents.
These APIs and related type annotations also affect how your OpenAPI looks when deploying the Executor with jina.Deployment
or jina.Flow
using the HTTP protocol.
Single Document#
When using doc
as a keyword argument, you need to add a single BaseDoc
as your request and response schema as seen in the document type binding section.
Jina will ensure that even if multiple Documents
are sent from the client, the Executor will process only one at a time.
from typing import Dict, Union, TypeVar
from jina import Executor, requests
from docarray import DocList, BaseDoc
from pydantic import BaseModel
T_input = TypeVar('T_input', bound='BaseDoc')
T_output = TypeVar('T_output', bound='BaseDoc')
class MyExecutor(Executor):
@requests
async def foo(
self,
doc: T_input,
**kwargs,
) -> Union[T_output, Dict, None]:
pass
Working on single Documents instead of batches can make your interface and code cleaner. In many cases, like in Generative AI, input rarely comes in batches, and models can be heavy enough that they cannot profit from processing multiple inputs at the same time.
Batching documents#
When using docs
as a keyword argument, you need to add a parametrized DocList
as your request and response schema as seen in the document type binding section.
In this case, Jina will ensure that all the request’s Documents
are passed to the Executor. The “request_size” parameter from Client controls how many Documents are passed to the server in each request.
When using batches, you can leverage the dynamic batching feature.
from typing import Dict, Union, TypeVar
from jina import Executor, requests
from docarray import DocList, BaseDoc
from pydantic import BaseModel
T_input = TypeVar('T_input', bound='BaseDoc')
T_output = TypeVar('T_output', bound='BaseDoc')
class MyExecutor(Executor):
@requests
async def foo(
self,
docs: DocList[T_input],
**kwargs,
) -> Union[DocList[T_output], Dict, None]:
pass
Working on batches of Documents in the same method call can make sense, especially for serving models that handle multiple inputs at the same time, like when serving embedding models.
Parameters#
Often, the behavior of a model or service depends not just on the input data (documents in this case) but also on other parameters. An example might be special attributes that some ML models allow you to configure, like maximum token length or other attributes not directly related to the data input.
Executor methods decorated with requests
accept a parameters
attribute in their signature to provide this flexibility.
This attribute can be a plain Python dictionary or a Pydantic Model. To get a Pydantic model the parameters
argument needs to have the model
as a type annotation.
from typing import Dict, Union, TypeVar
from jina import Executor, requests
from docarray import DocList, BaseDoc
from pydantic import BaseModel
T_input = TypeVar('T_input', bound='BaseDoc')
T_output = TypeVar('T_output', bound='BaseDoc')
T_output = TypeVar('T_parameters', bound='BaseModel')
class MyExecutor(Executor):
@requests
async def foo(
self,
docs: DocList[T_input],
parameters: Union[Dict, BaseModel],
**kwargs,
) -> Union[DocList[T_output], Dict, None]:
pass
Defining parameters
as a Pydantic model instead of a simple dictionary has two main benefits:
Validation and default values: You can get validation of the parameters that the Executor expected before the Executor can access any invalid key. You can also easily define defaults.
Descriptive OpenAPI definition when using HTTP protocol.
Tracing context#
Executors also accept tracing_context
as input if you want to add custom traces in your Executor.
from typing import Dict, Union, TypeVar
from jina import Executor, requests
from docarray import DocList, BaseDoc
from pydantic import BaseModel
T_input = TypeVar('T_input', bound='BaseDoc')
T_output = TypeVar('T_output', bound='BaseDoc')
T_output = TypeVar('T_parameters', bound='BaseModel')
class MyExecutor(Executor):
@requests
async def foo(
self,
tracing_context: Optional['Context'],
**kwargs,
) -> Union[DocList[T_output], Dict, None]:
pass
Other arguments#
When using an Executors in a Flow
, you may use an Executor to merge results from upstream Executors.
For these merging Executors you can use one of the extra arguments.
Hint
You can also use an Executor as a simple Pythonic class. This is especially useful for locally testing the Executor-specific logic before serving it.
Hint
If you don’t need certain arguments, you can suppress them into **kwargs
. For example:
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo_using_docs_arg(self, docs, **kwargs):
print(docs)
@requests
def foo_using_docs_parameters_arg(self, docs, parameters, **kwargs):
print(docs)
print(parameters)
@requests
def foo_using_no_arg(self, **kwargs):
# the args are suppressed into kwargs
print(kwargs)
Returns#
Every Executor method can return
in three ways:
You can directly return a
BaseDoc
orDocList
object.If you return
None
or don’t have areturn
in your method, then the originaldocs
ordoc
object (potentially mutated by your function) is returned.If you return a
dict
object, it will be considered as a result and returned onparameters['__results__']
to the client.
from jina import requests, Executor, Deployment
class MyExec(Executor):
@requests(on='/status')
def status(self, **kwargs):
return {'internal_parameter': 20}
with Deployment(uses=MyExec) as dep:
print(dep.post(on='/status', return_responses=True)[0].to_dict()["parameters"])
{"__results__": {"my_executor/rep-0": {"internal_parameter": 20.0}}}
Streaming endpoints#
Executors can stream Documents individually rather than as a whole DocList. This is useful when you want to return Documents one by one and you want the client to immediately process Documents as they arrive. This can be helpful for Generative AI use cases, where a Large Language Model is used to generate text token by token and the client displays tokens as they arrive. Streaming endpoints receive one Document as input and yields one Document at a time.
Note
Streaming endpoints are only supported for HTTP and gRPC protocols and for Deployment and Flow with one single Executor.
For HTTP deployment streaming executors generate a GET endpoint. The GET endpoint support passing documet fields in the request body or as URL query parameters, however, query parameters only support string, integer, or float fields, whereas, the request body support all serializable docarrays. The Jina client uses the request body.
A streaming endpoint has the following signature:
from jina import Executor, requests, Deployment
from docarray import BaseDoc
# first define schemas
class MyDocument(BaseDoc):
text: str
# then define the Executor
class MyExecutor(Executor):
@requests(on='/hello')
async def task(self, doc: MyDocument, **kwargs) -> MyDocument:
for i in range(100):
yield MyDocument(text=f'hello world {i}')
with Deployment(
uses=MyExecutor,
port=12345,
cors=True
) as dep:
dep.block()
From the client side, any SSE client can be used to receive the Documents, one at a time. Jina offers a standard python client for using the streaming endpoint:
from jina import Client
client = Client(port=12345, cors=True, asyncio=True) # or protocol='grpc'
async for doc in client.stream_doc(
on='/hello', inputs=MyDocument(text='hello world'), return_type=MyDocument
):
print(doc.text)
hello world 0
hello world 1
hello world 2
You can also refer to the following Javascript code to connect with the streaming endpoint from your browser:
<!DOCTYPE html>
<html lang="en">
<body>
<h2>SSE Client</h2>
<script>
const evtSource = new EventSource("http://localhost:8080/hello?id=1&exec_endpoint=/hello");
evtSource.addEventListener("update", function(event) {
// Logic to handle status updates
console.log(event)
});
evtSource.addEventListener("end", function(event) {
console.log('Handling end....')
evtSource.close();
});
</script></body></html>
Exception handling#
Exceptions inside @requests
-decorated functions can simply be raised.
from jina import Executor, requests
class MyExecutor(Executor):
@requests
def foo(self, **kwargs):
raise NotImplementedError('No time for it')
Example usage and output
from jina import Deployment
dep = Deployment(uses=MyExecutor)
def print_why(resp):
print(resp.status.description)
with dep:
dep.post('', on_error=print_why)
[...]
executor0/rep-0@28271[E]:NotImplementedError('no time for it')
add "--quiet-error" to suppress the exception details
[...]
File "/home/joan/jina/jina/jina/serve/executors/decorators.py", line 115, in arg_wrapper
return fn(*args, **kwargs)
File "/home/joan/jina/jina/toy.py", line 8, in foo
raise NotImplementedError('no time for it')
NotImplementedError: no time for it
NotImplementedError('no time for it')
OpenAPI from Executor endpoints#
When deploying an Executor and serving it with HTTP, Jina uses FastAPI to expose all Executor endpoints as HTTP endpoints, and you can enjoy a corresponding OpenAPI via the Swagger UI. You can also add descriptions and examples to your DocArray and Pydantic types so your users and clients can enjoy an API.
Let’s see how this would look:
from jina import Executor, requests, Deployment
from docarray import BaseDoc
from pydantic import BaseModel, Field
class Prompt(BaseDoc):
"""Prompt Document to be input to a Language Model"""
text: str = Field(description='The text of the prompt', example='Write me a short poem')
class Generation(BaseDoc):
"""Document representing the generation of the Large Language Model"""
prompt: str = Field(description='The original prompt that created this output')
text: str = Field(description='The actual generated text')
class LLMCallingParams(BaseModel):
"""Calling parameters of the LLM model"""
num_max_tokens: int = Field(default=5000, description='The limit of tokens the model can take, it can affect the memory consumption of the model')
class MyLLMExecutor(Executor):
@requests(on='/generate')
def generate(self, doc: Prompt, parameters: LLMCallingParams, **kwargs) -> Generation:
...
with Deployment(port=12345, protocol='http', uses=MyLLMExecutor) as dep:
dep.block()
──── 🎉 Deployment is ready to serve! ────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol http │
│ 🏠 Local 0.0.0.0:54322 │
│ 🔒 Private xxx.xx.xxx.xxx:54322 │
│ Public xx.xxx.xxx.xxx:54322 │
╰──────────────────────────────────────────╯
╭─────────── 💎 HTTP extension ────────────╮
│ 💬 Swagger UI 0.0.0.0:54322/docs │
│ 📚 Redoc 0.0.0.0:54322/redoc │
╰──────────────────────────────────────────╯
After running this code, you can open ‘0.0.0.0:12345/docs’ on your browser:
Note how the schema defined in the OpenAPI also considers the examples and descriptions for the types and fields.
The same behavior is seen when serving Executors with a jina.Flow
. In that case, the input and output schemas of each endpoint are inferred by the Flow’s
topology, so if two Executors are chained in a Flow, the schema of the input is the schema of the first Executor and the schema of the response
corresponds to the output of the second Executor.