Send & Receive Data#

After a Client has connected to a Deployment or a Flow, it can send requests to the service using its post() method. This expects as inputs the Executor endpoint that you want to target, as well as a Document or Iterable of Documents:

from docarray.documents import TextDoc

d1 = TextDoc(text='hello')
client = Client(...)

client.post('/endpoint', d1)
from docarray.documents import TextDoc

d1 = TextDoc(text='hello')
d2 = TextDoc(text='world')
client = Client(...)

client.post('/endpoint', inputs=[d1, d2])
from docarray import DocList

d1 = TextDoc(text='hello')
d2 = TextDoc(text='world')
da = DocList[TextDoc]([d1, d2])
client = Client(...)

client.post('/endpoint', da)
from docarray.documents import TextDoc

def doc_gen():
    for j in range(10):
        yield TextDoc(content=f'hello {j}')
        
client = Client(...)

client.post('/endpoint', doc_gen)
client = Client(...)

client.post('/endpoint')

Caution

Flow and Deployment also provide a .post() method that follows the same interface as client.post(). However, once your solution is deployed remotely, these objects are not present anymore. Hence, deployment.post() and flow.post() are not recommended outside of testing or debugging use cases.

Send data in batches#

Especially during indexing, a Client can send up to thousands or millions of Documents to a Flow. Those Documents are internally batched into a Request, providing a smaller memory footprint and faster response times thanks to callback functions.

The size of these batches can be controlled with the request_size keyword. The default request_size is 100 Documents. The optimal size will depend on your use case.

from jina import Deployment, Client
from docarray import DocList, BaseDoc

with Deployment() as dep:
    client = Client(port=f.port)
    client.post('/', DocList[BaseDoc](BaseDoc() for _ in range(100)), request_size=10)

Send data asynchronously#

There is an async version of the Python Client which works with post() and mutate().

While the standard Client is also asynchronous under the hood, its async version exposes this fact to the outside world, by allowing coroutines as input, and returning an asynchronous iterator. This means you can iterate over Responses one by one, as they come in.

import asyncio

from jina import Client, Deployment
from docarray import BaseDoc


async def async_inputs():
    for _ in range(10):
        yield BaseDoc()
        await asyncio.sleep(0.1)


async def run_client(port):
    client = Client(port=port, asyncio=True)
    async for resp in client.post('/', async_inputs, request_size=1):
        print(resp)


with Deployment() as dep:  # Using it as a Context Manager will start the Deployment
    asyncio.run(run_client(dep.port))

Async send is useful when calling an external service from an Executor.

from jina import Client, Executor, requests
from docarray import DocList, BaseDoc


class DummyExecutor(Executor):
    c = Client(host='grpc://0.0.0.0:51234', asyncio=True)

    @requests
    async def process(self, docs: DocList[BaseDoc], **kwargs) -> DocList[BaseDoc]:
        return self.c.post('/', docs, return_type=DocList[BaseDoc])

Send data to specific Executors#

Usually a Flow will send each request to all Executors with matching endpoints as configured. But the Client also allows you to only target specific Executors in a Flow using the target_executor keyword. The request will then only be processed by the Executors which match the provided target_executor regex. Its usage is shown in the listing below.

from jina import Client, Executor, Flow, requests
from docarray import DocList
from docarray.documents import TextDoc


class FooExecutor(Executor):
    @requests
    async def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
        for doc in docs:
            doc.text = f'foo was here and got {len(docs)} document'

class BarExecutor(Executor):
    @requests
    async def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
        for doc in docs:
            doc.text = f'bar was here and got {len(docs)} document'


f = (
    Flow()
        .add(uses=FooExecutor, name='fooExecutor')
        .add(uses=BarExecutor, name='barExecutor')
)

with f:  # Using it as a Context Manager will start the Flow
    client = Client(port=f.port)
    docs = client.post(on='/', inputs=TextDoc(text=''), target_executor='bar*', return_type=DocList[TextDoc])
    print(docs.text)

This will send the request to all Executors whose names start with ‘bar’, such as ‘barExecutor’. In the simplest case, you can specify a precise Executor name, and the request will be sent only to that single Executor.

Use Unary or Streaming gRPC#

The Flow with gRPC protocol implements the unary and the streaming RPC lifecycle for communicating with the clients. When sending more than one request using the batching or the iterator mechanism, the RPC lifecycle for the post() method can be controlled using the stream boolean method argument. By default the stream option is set to True which uses the streaming RPC to send the data to the Flow. If the stream option is set to False, the unary RPC is used to send the data to the Flow. Both RPC lifecycles are implemented to provide the flexibility for the clients.

There might be performance penalties when using the streaming RPC in the Python gRPC implementation.

Hint

This option is only valid for gRPC protocol.

Refer to the gRPC Performance Best Practices guide for more implementations details and considerations.

Configure gRPC Client options#

The Client supports the grpc_channel_options parameter which allows more customization of the gRPC channel construction. The grpc_channel_options parameter accepts a dictionary of gRPC configuration options which will be used to overwrite the default options. The default gRPC options are:

('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1),
('grpc.keepalive_time_ms', 9999),
# send keepalive ping every 9 second, default is 2 hours.
('grpc.keepalive_timeout_ms', 4999),
# keepalive ping time out after 4 seconds, default is 20 seconds
('grpc.keepalive_permit_without_calls', True),
# allow keepalive pings when there's no gRPC calls
('grpc.http1.max_pings_without_data', 0),
# allow unlimited amount of keepalive pings without data
('grpc.http1.min_time_between_pings_ms', 10000),
# allow grpc pings from client every 9 seconds
('grpc.http1.min_ping_interval_without_data_ms', 5000),
# allow grpc pings from client without data every 4 seconds

If the max_attempts is greater than 1 on the post() method, the grpc.service_config option will not be applied since the retry options will be configured internally.

Refer to the channel_arguments section for the full list of available gRPC options.

Hint

Refer to the Configure Executor gRPC options section for configuring the Executor gRPC options.

Returns#

post() returns a DocList containing all Documents flattened over all Requests. When setting return_responses=True, this behavior is changed to returning a list of Response objects.

If a callback function is provided, client.post() will return none.

from jina import Deployment, Client
from docarray import DocList
from docarray.documents import TextDoc

with Deployment() as dep:
    client = Client(port=dep.port)
    docs = client.post(on='', inputs=TextDoc(text='Hi there!'), return_type=DocList[TextDoc])
    print(docs)
    print(docs.text)
<DocList[TextDoc] (length=1)>
['Hi there!']
from docarray import DocList
from docarray.documents import TextDoc

with Deployment() as dep:
    client = Client(port=dep.port)
    resp = client.post(on='', inputs=TextDoc(text='Hi there!'), return_type=DocList[TextDoc], return_responses=True)
    print(resp)
    print(resp[0].docs.text)
[<jina.types.request.data.DataRequest ('header', 'parameters', 'routes', 'data') at 140619524354592>]
['Hi there!']
from jina import Flow, Client
from docarray import DocList
from docarray.documents import TextDoc

with Deployment() as dep:
    client = Client(port=f.port)
    resp = client.post(
        on='',
        inputs=TextDoc(text='Hi there!'),
        on_done=lambda resp: print(resp.docs.texts),
    )
    print(resp)
['Hi there!']
None

Return type#

post() returns the Documents as the server sends them back. In order for the client to return the user’s expected document type, the return_type argument is required.

The return_type can be a parametrized DocList or a single BaseDoc type. If the return type parameter is a BaseDoc type, the results will be returned as a DocList[T] except if the result contains a single Document, in that case the only Document in the list is returned instead of the DocList.

Callbacks vs returns#

Callback operates on every sub-request generated by request_size. The callback function consumes the response one by one. The old response is immediately free from the memory after the consumption.

When callback is not provided, the client accumulates all DocLists of all Requests before returning. This means you will not receive results until all Requests have been processed, which is slower and requires more memory.

Force the order of responses#

Note that the Flow processes Documents in an asynchronous and a distributed manner. The order of the Flow processing the requests may not be the same order as the Client sending them. Hence, the response order may also not be consistent as the sending order.

To force the order of the results to be deterministic and the same as when they are sent, passing results_in_order parameter to post().

import random
import time
from jina import Deployment, Executor, requests, Client
from docarray import DocList
from docarray.documents import TextDoc


class RandomSleepExecutor(Executor):
    @requests
    def foo(self, *args, **kwargs):
        rand_sleep = random.uniform(0.1, 1.3)
        time.sleep(rand_sleep)


dep = Deployment(uses=RandomSleepExecutor, replicas=3)
input_text = [f'ordinal-{i}' for i in range(180)]
input_da = DocList[TextDoc]([TextDoc(text=t) for t in input_text])

with f:
    c = Client(port=dep.port, protocol=dep.protocol)
    output_da = c.post('/', inputs=input_da, request_size=10, return_type=DocList[TextDoc], results_in_order=True)
    for input, output in zip(input_da, output_da):
        assert input.text == output.text