Flow#
Important
A Flow is a set of Deployments. Be sure to read up on those before diving more deeply into Flows!
A Flow
orchestrates Executor
s into a processing pipeline to accomplish a task. Documents “flow” through the pipeline and are processed by Executors.
You can think of Flow as an interface to configure and launch your microservice architecture, while the heavy lifting is done by the services themselves. In particular, each Flow also launches a Gateway service, which can expose all other services through an API that you define.
Why use a Flow?#
Once you’ve learned about Documents, DocList and Executor,, you can split a big task into small independent modules and services. But you need to chain them together to create, build ,and serve an application. Flows enable you to do exactly this.
Flows connect microservices (Executors) to build a service with proper client/server style interfaces over HTTP, gRPC, or WebSockets.
Flows let you scale these Executors independently to match your requirements.
Flows let you easily use other cloud-native orchestrators, such as Kubernetes, to manage your service.
Create#
The most trivial Flow
is an empty one. It can be defined in Python or from a YAML file:
from jina import Flow
f = Flow()
jtype: Flow
Important
All arguments received by Flow()
API will be propagated to other entities (Gateway, Executor) with the following exceptions:
uses
anduses_with
won’t be passed to Gatewayport
,port_monitoring
,uses
anduses_with
won’t be passed to Executor
Tip
An empty Flow contains only the Gateway.
For production, you should define your Flows with YAML. This is because YAML files are independent of the Python logic code and easier to maintain.
Minimum working example#
from jina import Flow, Executor, requests
from docarray import DocList, BaseDoc
class MyExecutor(Executor):
@requests(on='/bar')
def foo(self, docs: DocList[BaseDoc], **kwargs) -> DocList[BaseDoc]:
print(docs)
f = Flow().add(name='myexec1', uses=MyExecutor)
with f:
f.post(on='/bar', inputs=BaseDoc(), return_type=DocList[BaseDoc], on_done=print)
Server:
from jina import Flow, Executor, requests
from docarray import DocList, BaseDoc
class MyExecutor(Executor):
@requests(on='/bar')
def foo(self, docs: DocList[BaseDoc], **kwargs) -> DocList[BaseDoc]:
print(docs)
f = Flow(port=12345).add(name='myexec1', uses=MyExecutor)
with f:
f.block()
Client:
from jina import Client, Document
c = Client(port=12345)
c.post(on='/bar', inputs=BaseDoc(), return_type=DocList[BaseDoc], on_done=print)
my.yml
:
jtype: Flow
executors:
- name: myexec1
uses: FooExecutor
py_modules: exec.py
exec.py
:
from jina import Deployment, Executor, requests
from docarray import DocList, BaseDoc
from docarray.documents import TextDoc
class FooExecutor(Executor):
@requests
def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
for doc in docs:
doc.text = 'foo was here'
docs.summary()
return docs
from jina import Flow
from docarray import DocList, BaseDoc
from docarray.documents import TextDoc
f = Flow.load_config('my.yml')
with f:
try:
f.post(on='/bar', inputs=TextDoc(), on_done=print)
except Exception as ex:
# handle exception
pass
Caution
The statement with f:
starts the Flow, and exiting the indented with block stops the Flow, including all Executors defined in it.
Exceptions raised inside the with f:
block will close the Flow context manager. If you don’t want this, use a try...except
block to surround the statements that could potentially raise an exception.
Start and stop#
When a Flow
starts, all included Executors (single for a Deployment, multiple for a Flow) will start as well, making it possible to reach the service through its API.
There are three ways to start an Flow: In Python, from a YAML file, or from the terminal.
Generally in Python: use Deployment or Flow as a context manager in Python.
As an entrypoint from terminal: use
Jina CLI <cli>
and a Flow YAML file.As an entrypoint from Python code: use Flow as a context manager inside
if __name__ == '__main__'
No context manager: manually call
start()
andclose()
.
from jina import Flow
f = Flow()
with f:
pass
jina flow --uses flow.yml
from jina import Flow
f = Flow()
if __name__ == '__main__':
with f:
pass
from jina import Flow
f = Flow()
f.start()
f.close()
The statement with f:
starts the Flow, and exiting the indented with
block stops the Flow, including all its Executors.
A successful start of a Flow looks like this:
Your addresses and entrypoints can be found in the output. When you enable more features such as monitoring, HTTP gateway, TLS encryption, this display expands to contain more information.
Multiprocessing spawn
Some corner cases require forcing a spawn
start method for multiprocessing, for example if you encounter “Cannot re-initialize CUDA in forked subprocess”. Read more in the docs
Serve#
Serve forever#
In most scenarios, a Flow should remain reachable for prolonged periods of time. This can be achieved from Python or the terminal:
from jina import Flow
f = Flow()
with f:
f.block()
The .block()
method blocks the execution of the current thread or process, enabling external clients to access the Flow.
jina flow --uses flow.yml
In this case, the Flow can be stopped by interrupting the thread or process.
Serve until an event#
Alternatively, a multiprocessing
or threading
Event
object can be passed to .block()
, which stops the Flow once set.
from jina import Flow
import threading
def start_flow(stop_event):
"""start a blocking Flow."""
f = Flow()
with f:
f.block(stop_event=stop_event)
e = threading.Event() # create new Event
t = threading.Thread(name='Blocked-Flow', target=start_flow, args=(e,))
t.start() # start Flow in new Thread
# do some stuff
e.set() # set event and stop (unblock) the Flow
Serve on Google Colab#
Example built with docarray<0.30
This example is built using docarray<0.30 version. Most of the concepts are similar, but some APIs of how Executors are built change when using newer docarray version.
Google Colab provides an easy-to-use Jupyter notebook environment with GPU/TPU support. Flows are fully compatible with Google Colab and you can use it in the following ways:
Open the notebook on Google Colab
Please follow the walkthrough and enjoy the free GPU/TPU!
Tip
Hosing services on Google Colab is not recommended if your server aims to be long-lived or permanent. It is often used for quick experiments, demonstrations or leveraging its free GPU/TPU. For stable, secure and free hosting of your Flow, check out JCloud.
Export#
A Flow YAML can be exported as a Docker Compose YAML or Kubernetes YAML bundle.
Docker Compose#
from jina import Flow
f = Flow().add()
f.to_docker_compose_yaml()
jina export docker-compose flow.yml docker-compose.yml
This will generate a single docker-compose.yml
file.
For advanced utilization of Docker Compose with Jina, refer to How to
Kubernetes#
from jina import Flow
f = Flow().add()
f.to_kubernetes_yaml('flow_k8s_configuration')
jina export kubernetes flow.yml ./my-k8s
The generated folder can be used directly with kubectl
to deploy the Flow to an existing Kubernetes cluster.
For advanced utilisation of Kubernetes with Jina please refer to How to
Tip
Based on your local Jina version, Executor Hub may rebuild the Docker image during the YAML generation process.
If you do not wish to rebuild the image, set the environment variable JINA_HUB_NO_IMAGE_REBUILD
.
Tip
If an Executor requires volumes to be mapped to persist data, Jina will create a StatefulSet for that Executor instead of a Deployment.
You can control the access mode, storage class name and capacity of the attached Persistent Volume Claim by using Jina environment variables
JINA_K8S_ACCESS_MODES
, JINA_K8S_STORAGE_CLASS_NAME
and JINA_K8S_STORAGE_CAPACITY
. Only the first volume will be considered to be mounted.
See also
For more in-depth guides on Flow deployment, check our how-tos for Docker compose and Kubernetes.
Caution
The port or ports arguments are ignored when calling the Kubernetes YAML, Jina will start the services binding to the ports 8080, except when multiple protocols need to be served when the consecutive ports (8081, …) will be used. This is because the Kubernetes service will direct the traffic from you and it is irrelevant to the services around because in Kubernetes services communicate via the service names irrespective of the internal port.
Add Executors#
Important
This section is for Flow-specific considerations when working with Executors. Check more information on working with Executors.
A Flow
orchestrates its Executor
s as a graph and sends requests to all Executors in the order specified by add()
or listed in a YAML file.
When you start a Flow, Executors always run in separate processes. Multiple Executors run in different processes. Multiprocessing is the lowest level of separation when you run a Flow locally. When running a Flow on Kubernetes, Docker Swarm, Jina AI Cloud Hosting, different Executors run in different containers, pods or instances.
Executors can be added into a Flow with add()
.
from jina import Flow
f = Flow().add()
This adds an “empty” Executor called BaseExecutor
to the Flow. This Executor (without any parameters) performs no actions.
To more easily identify an Executor, you can change its name by passing the name
parameter:
from jina import Flow
f = Flow().add(name='myVeryFirstExecutor').add(name='secondIsBest')
You can also define the above Flow in YAML:
jtype: Flow
executors:
- name: myVeryFirstExecutor
- name: secondIsBest
Save it as flow.yml
and run it:
jina flow --uses flow.yml
More Flow YAML specifications can be found in Flow YAML Specification.
How Executors process Documents in a Flow#
Let’s understand how Executors process Documents’s inside a Flow, and how changes are chained and applied, affecting downstream Executors in the Flow.
from jina import Executor, requests, Flow
from docarray import DocList, BaseDoc
from docarray.documents import TextDoc
class PrintDocuments(Executor):
@requests
def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
for doc in docs:
print(f' PrintExecutor: received document with text: "{doc.text}"')
return docs
class ProcessDocuments(Executor):
@requests(on='/change_in_place')
def in_place(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
# This Executor only works on `docs` and doesn't consider any other arguments
for doc in docs:
print(f'ProcessDocuments: received document with text "{doc.text}"')
doc.text = 'I changed the executor in place'
@requests(on='/return_different_docarray')
def ret_docs(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
# This executor only works on `docs` and doesn't consider any other arguments
ret = DocList[TextDoc]()
for doc in docs:
print(f'ProcessDocuments: received document with text: "{doc.text}"')
ret.append(TextDoc(text='I returned a different Document'))
return ret
f = Flow().add(uses=ProcessDocuments).add(uses=PrintDocuments)
with f:
f.post(on='/change_in_place', inputs=DocList[TextDoc]([TextDoc(text='request1')]), return_type=DocList[TextDoc])
f.post(
on='/return_different_docarray', inputs=DocList[TextDoc]([TextDoc(text='request2')]), return_type=DocList[TextDoc]))
)
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:58746 │
│ 🔒 Private 192.168.1.187:58746 │
│ 🌍 Public 212.231.186.65:58746 │
╰──────────────────────────────────────────╯
ProcessDocuments: received document with text "request1"
PrintExecutor: received document with text: "I changed the executor in place"
ProcessDocuments: received document with text: "request2"
PrintExecutor: received document with text: "I returned a different Document"
Define topologies over Executors#
Flow
s are not restricted to sequential execution. Internally they are modeled as graphs, so they can represent any complex, non-cyclic topology.
A typical use case for such a Flow is a topology with a common pre-processing part, but different indexers separating embeddings and data.
To define a custom topology you can use the needs
keyword when adding an Executor
. By default, a Flow assumes that every Executor needs the previously added Executor.
from jina import Executor, requests, Flow
from docarray import DocList
from docarray.documents import TextDoc
class FooExecutor(Executor):
@requests
async def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
docs.append(TextDoc(text=f'foo was here and got {len(docs)} document'))
class BarExecutor(Executor):
@requests
async def bar(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
docs.append(TextDoc(text=f'bar was here and got {len(docs)} document'))
class BazExecutor(Executor):
@requests
async def baz(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
docs.append(TextDoc(text=f'baz was here and got {len(docs)} document'))
class MergeExecutor(Executor):
@requests
async def merge(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
return docs
f = (
Flow()
.add(uses=FooExecutor, name='fooExecutor')
.add(uses=BarExecutor, name='barExecutor', needs='fooExecutor')
.add(uses=BazExecutor, name='bazExecutor', needs='fooExecutor')
.add(uses=MergeExecutor, needs=['barExecutor', 'bazExecutor'])
)
When sending message to this Flow,
with f:
print(f.post('/', return_type=DocList[TextDoc]).text)
This gives the output:
['foo was here and got 0 document', 'bar was here and got 1 document', 'baz was here and got 1 document']
Both BarExecutor
and BazExecutor
only received a single Document
from FooExecutor
because they are run in parallel. The last Executor executor3
receives both DocLists and merges them automatically.
This automated merging can be disabled with no_reduce=True
. This is useful for providing custom merge logic in a separate Executor. In this case the last .add()
call would look like .add(needs=['barExecutor', 'bazExecutor'], uses=CustomMergeExecutor, no_reduce=True)
. This feature requires Jina >= 3.0.2.
Chain Executors in Flow with different schemas#
When using docarray>=0.30.0
, when building a Flow you should ensure that the Document types used as input of an Executor match the schema
of the output of its incoming previous Flow.
For instance, this Flow will fail to start because the Document types are wrongly chained.
from jina import Executor, requests, Flow
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
import numpy as np
class SimpleStrDoc(BaseDoc):
text: str
class TextWithEmbedding(SimpleStrDoc):
embedding: NdArray
class TextEmbeddingExecutor(Executor):
@requests(on='/foo')
def foo(docs: DocList[SimpleStrDoc], **kwargs) -> DocList[TextWithEmbedding]
ret = DocList[TextWithEmbedding]()
for doc in docs:
ret.append(TextWithEmbedding(text=doc.text, embedding=np.ramdom.rand(10))
return ret
class ProcessEmbedding(Executor):
@requests(on='/foo')
def foo(docs: DocList[TextWithEmbedding], **kwargs) -> DocList[TextWithEmbedding]
for doc in docs:
self.logger.info(f'Getting embedding with shape {doc.embedding.shape}')
flow = Flow().add(uses=TextEmbeddingExecutor, name='embed').add(uses=ProcessEmbedding, name='process')
with flow:
flow.block()
from jina import Executor, requests, Flow
from docarray import DocList, BaseDoc
from docarray.typing import NdArray
import numpy as np
class SimpleStrDoc(BaseDoc):
text: str
class TextWithEmbedding(SimpleStrDoc):
embedding: NdArray
class TextEmbeddingExecutor(Executor):
@requests(on='/foo')
def foo(docs: DocList[SimpleStrDoc], **kwargs) -> DocList[TextWithEmbedding]
ret = DocList[TextWithEmbedding]()
for doc in docs:
ret.append(TextWithEmbedding(text=doc.text, embedding=np.ramdom.rand(10))
return ret
class ProcessText(Executor):
@requests(on='/foo')
def foo(docs: DocList[SimpleStrDoc], **kwargs) -> DocList[TextWithEmbedding]
for doc in docs:
self.logger.info(f'Getting embedding with type {doc.text}')
# This Flow will fail to start because the input type of "process" does not match the output type of "embed"
flow = Flow().add(uses=TextEmbeddingExecutor, name='embed').add(uses=ProcessText, name='process')
with flow:
flow.block()
Jina is also compatible with docarray<0.30, when using that version, only a single Document schema existed (equivalent to LegacyDocument in docarray>0.30) and therefore
there were no explicit compatibility issues between schemas. However, the complexity was implicitly there (An Executor may expect a Document to be filled with text
and only fail at Runtime).
Floating Executors#
Some Executors in your Flow can be used for asynchronous background tasks that take time and don’t generate a required output. For instance, logging specific information in external services, storing partial results, etc.
You can unblock your Flow from such tasks by using floating Executors.
Normally, all Executors form a pipeline that handles and transforms a given request until it is finally returned to the Client.
However, floating Executors do not feed their outputs back into the pipeline. Therefore, the Executor’s output does not affect the response for the Client, and the response can be returned without waiting for the floating Executor to complete its task.
Those Executors are marked with the floating
keyword when added to a Flow
:
import time
from jina import Executor, requests, Flow
from docarray import DocList
from docarray.documents import TextDoc
class FastChangingExecutor(Executor):
@requests()
def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
for doc in docs:
doc.text = 'Hello World'
class SlowChangingExecutor(Executor):
@requests()
def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
time.sleep(2)
print(f' Received {docs.text}')
for doc in docs:
doc.text = 'Change the document but will not affect response'
f = (
Flow()
.add(name='executor0', uses=FastChangingExecutor)
.add(
name='floating_executor',
uses=SlowChangingExecutor,
needs=['gateway'],
floating=True,
)
)
with f:
f.post(on='/endpoint', inputs=DocList[TextDoc]([TextDoc()]), return_type=DocList[TextDoc]) # we need to send a first
start_time = time.time()
response = f.post(on='/endpoint', inputs=DocList[TextDoc]([TextDoc(), TextDoc()]), return_type=DocList[TextDoc])
end_time = time.time()
print(f' Response time took {end_time - start_time}s')
print(f' {response.texts}')
Response time took 0.011997222900390625s
['Hello World', 'Hello World']
Received ['Hello World', 'Hello World']
In this example the response is returned without waiting for the floating Executor to complete. However, the Flow is not closed until the floating Executor has handled the request.
You can plot the Flow and see the Executor is floating disconnected from the Gateway.
A floating Executor can never come before a non-floating Executor in your Flow’s topology.
This leads to the following behaviors:
Implicit reordering: When you add a non-floating Executor after a floating Executor without specifying its
needs
parameter, the non-floating Executor is chained after the previous non-floating one.
from jina import Flow
f = Flow().add().add(name='middle', floating=True).add()
f.plot()
Chaining floating Executors: To chain more than one floating Executor, you need to add all of them with the
floating
flag, and explicitly specify theneeds
argument.
from jina import Flow
f = Flow().add().add(name='middle', floating=True).add(needs=['middle'], floating=True)
f.plot()
Overriding the
floating
flag: If you add a floating Executor as part ofneeds
parameter of a non-floating Executor, then the floating Executor is no longer considered floating.
from jina import Flow
f = Flow().add().add(name='middle', floating=True).add(needs=['middle'])
f.plot()
Add Conditioning#
Sometimes you may not want all Documents to be processed by all Executors. For example when you process text and image Documents you want to forward them to different Executors depending on their data type.
You can set conditioning for every Executor
in the Flow. Documents that don’t meet the condition will be removed before reaching that Executor. This allows you to build a selection control in the Flow.
Define conditions#
To add a condition to an Executor, pass it to the when
parameter of add()
method of the Flow. This then defines when a Document is processed by the Executor:
You can use the MongoDB query language used in docarray which follows to specify a filter condition for each Executor.
from jina import Flow
f = Flow().add(when={'tags__key': {'$eq': 5}})
Then only Documents that satisfy the when
condition will reach the associated Executor. Any Documents that don’t satisfy that condition won’t reach the Executor.
If you are trying to separate Documents according to the data modality they hold, you need to choose a condition accordingly.
See Also
In addition to $exists
you can use a number of other operators to define your filter: $eq
, $gte
, $lte
, $size
, $and
, $or
and many more. For details, consult MongoDB query language and docarray.
# define filter conditions
text_condition = {'text': {'$exists': True}}
tensor_condition = {'tensor': {'$exists': True}}
These conditions specify that only Documents that hold data of a specific modality can pass the filter.
from jina import Flow, Executor, requests
from docarray import DocList, BaseDoc
from typing import Dict
class MyDoc(BaseDoc):
text: str = ''
tags: Dict[str, int]
class MyExec
@requests
def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
print(f'{doc.tags}')
f = Flow().add(uses=MyExec).add(uses=MyExec, when={'tags__key': {'$eq': 5}}) # Create the empty Flow, add condition
with f: # Using it as a Context Manager starts the Flow
ret = f.post(
on='/search',
inputs=DocList[MyDoc]([MyDoc(tags={'key': 5}), MyDoc(tags={'key': 4})]),
return_type=DocList[MyDoc]
)
for doc in ret:
print(f'{doc.tags}') # only the Document fulfilling the condition is processed and therefore returned.
{'key': 5.0}
jtype: Flow
executors:
- name: executor
uses: MyExec
when:
tags__key:
$eq: 5
from jina import Flow
f = Flow.load_config('flow.yml') # Load the Flow definition from Yaml file
with f: # Using it as a Context Manager starts the Flow
ret = f.post(
on='/search',
inputs=DocList[MyDoc]([MyDoc(tags={'key': 5}), MyDoc(tags={'key': 4})]),
return_type=DocList[MyDoc]
)
for doc in ret:
print(f'{doc.tags}') # only the Document fulfilling the condition is processed and therefore returned.
{'key': 5.0}
Note that if a Document does not satisfy the when
condition of a filter, the filter removes the Document for that entire branch of the Flow.
This means that every Executor located behind a filter is affected by this, not just the specific Executor that defines the condition.
As with a real-life filter, once something fails to pass through it, it no longer continues down the pipeline.
Naturally, parallel branches in a Flow do not affect each other. So if a Document gets filtered out in only one branch, it can still be used in the other branch, and also after the branches are re-joined:
from jina import Flow, Executor, requests
from docarray import DocList, BaseDoc
from typing import Dict
class MyDoc(BaseDoc):
text: str = ''
tags: Dict[str, int]
class MyExec
@requests
def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
print(f'{doc.tags}')
f = (
Flow()
.add(uses=MyExec, name='first')
.add(uses=MyExec, when={'tags__key': {'$eq': 5}}, needs='first', name='exec1')
.add(uses=MyExec, when={'tags__key': {'$eq': 4}}, needs='first', name='exec2')
.needs_all(uses=MyExec, name='join')
)
with f:
ret = f.post(
on='/search',
inputs=DocList[MyDoc]([MyDoc(tags={'key': 5}), MyDoc(tags={'key': 4})]),
return_type=DocList[MyDoc]
)
for doc in ret:
print(f'{doc.tags}')
{'key': 5.0}
{'key': 4.0}
from jina import Flow, Executor, requests
from docarray import DocList, BaseDoc
from typing import Dict
class MyDoc(BaseDoc):
text: str = ''
tags: Dict[str, int]
class MyExec
@requests
def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
print(f'{doc.tags}')
f = (
Flow()
.add(uses=MyExec, name='first')
.add(uses=MyExec, when={'tags__key': {'$eq': 5}}, name='exec1', needs='first')
.add(uses=MyExec, when={'tags__key': {'$eq': 4}}, needs='exec1', name='exec2')
)
with f:
ret = f.post(
on='/search',
inputs=DocList[MyDoc]([MyDoc(tags={'key': 5}), MyDoc(tags={'key': 4})]),
return_type=DocList[MyDoc]
)
for doc in ret:
print(f'{doc.tags}')
This feature is useful to prevent some specialized Executors from processing certain Documents. It can also be used to build switch-like nodes, where some Documents pass through one branch of the Flow, while other Documents pass through a different parallel branch.
Note that whenever a Document does not satisfy the condition of an Executor, it is not even sent to that Executor. Instead, only a tailored Request without any payload is transferred. This means that you can not only use this feature to build complex logic, but also to minimize your networking overhead.
Merging upstream Documents#
Often when you’re building a Flow, you want an Executor to receive Documents from multiple upstream Executors.
For this you can use the docs_matrix
or docs_map
parameters (part of Executor endpoints signature). These Flow-specific arguments that can be used alongside an Executor’s default arguments:
from typing import Dict, Union, List, Optional
from jina import Executor, requests
from docarray import DocList
class MergeExec(Executor):
@requests
async def foo(
self,
docs: DocList[...],
parameters: Dict,
docs_matrix: Optional[List[DocList[...]]],
docs_map: Optional[Dict[str, DocList[...]]],
) -> DocList[MyDoc]:
pass
Use
docs_matrix
to receive a List of all incoming DocLists from upstream Executors:
[
DocList[...](...), # from Executor1
DocList[...](...), # from Executor2
DocList[...](...), # from Executor3
]
Use
docs_map
to receive a Dict, where each item’s key is the name of an upstream Executor and the value is the DocList coming from that Executor:
{
'Executor1': DocList[...](...),
'Executor2': DocList[...](...),
'Executor3': DocList[...](...),
}
Reducing multiple DocLists to one DocList#
The no_reduce
argument determines whether DocLists are reduced into one when being received:
To reduce all incoming DocLists into one single DocList, do not set
no_reduce
or set it toFalse
. Thedocs_map
anddocs_matrix
will beNone
.To receive a list all incoming DocList set
no_reduce
toTrue
. The Executor will receive the DocLists independently underdocs_matrix
anddocs_map
.
from jina import Flow, Executor, requests
from docarray import DocList, BaseDoc
class MyDoc(BaseDoc):
text: str = ''
class Exec1(Executor):
@requests
def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
doc.text = 'Exec1'
class Exec2(Executor):
@requests
def foo(self, docs: DocList[MyDoc], **kwargs) -> DocList[MyDoc]:
for doc in docs:
doc.text = 'Exec2'
class MergeExec(Executor):
@requests
def foo(self, docs: DocList[MyDoc], docs_matrix, **kwargs) -> DocList[MyDoc]:
documents_to_return = DocList[MyDoc]()
for doc1, doc2 in zip(*docs_matrix):
print(
f'MergeExec processing pairs of Documents "{doc1.text}" and "{doc2.text}"'
)
documents_to_return.append(
MyDoc(text=f'Document merging from "{doc1.text}" and "{doc2.text}"')
)
return documents_to_return
f = (
Flow()
.add(uses=Exec1, name='exec1')
.add(uses=Exec2, name='exec2')
.add(uses=MergeExec, needs=['exec1', 'exec2'], no_reduce=True)
)
with f:
returned_docs = f.post(on='/', inputs=MyDoc(), return_type=DocList[MyDoc])
print(f'Resulting documents {returned_docs[0].text}')
────────────────────────── 🎉 Flow is ready to serve! ──────────────────────────
╭────────────── 🔗 Endpoint ───────────────╮
│ ⛓ Protocol GRPC │
│ 🏠 Local 0.0.0.0:55761 │
│ 🔒 Private 192.168.1.187:55761 │
│ 🌍 Public 212.231.186.65:55761 │
╰──────────────────────────────────────────╯
MergeExec processing pairs of Documents "Exec1" and "Exec2"
Resulting documents Document merging from "Exec1" and "Exec2"
Visualize#
A Flow
has a built-in .plot()
function which can be used to visualize the Flow
:
from jina import Flow
f = Flow().add().add()
f.plot('flow.svg')
from jina import Flow
f = Flow().add(name='e1').add(needs='e1').add(needs='e1')
f.plot('flow-2.svg')
You can also do it in the terminal:
jina export flowchart flow.yml flow.svg
You can also visualize a remote Flow by passing the URL to jina export flowchart
.
Logging#
The default jina.logging.logger.JinaLogger
uses rich console logging that writes to the system console. The log_config
argument can be used to pass in a string of the pre-configured logging configuration names in Jina or the absolute YAML file path of the custom logging configuration. For most cases, the default logging configuration sufficiently covers local, Docker and Kubernetes environments.
Custom logging handlers can be configured by following the Python official Logging Cookbook examples. An example custom logging configuration file defined in a YAML file logging.json.yml
is:
handlers:
- StreamHandler
level: INFO
configs:
StreamHandler:
format: '%(asctime)s:{name:>15}@%(process)2d[%(levelname).1s]:%(message)s'
formatter: JsonFormatter
The logging configuration can be used as follows:
from jina import Flow
f = Flow(log_config='./logging.json.yml')
jtype: Flow
with:
log_config: './logging.json.yml'
Custom logging configuration#
The default logging or custom logging configuration at the Flow level will be propagated to the Gateway
and Executor
entities. If that is not desired, every Gateway
or Executor
entity can be provided with its own custom logging configuration.
You can configure two different Executors
as in the below example:
from jina import Flow
f = (
Flow().add(log_config='./logging.json.yml').add(log_config='./logging.file.yml')
) # Create a Flow with two Executors
logging.file.yml
is another YAML file with a custom FileHandler
configuration.
Hint
Refer to Gateway logging configuration section for configuring the Gateway
logging.
Caution
When exporting the Flow to Kubernetes, the log_config file path must refer to the absolute local path of each container. The custom logging
file must be included during the containerization process. If the availability of the file is unknown then its best to rely on the default
configuration. This restriction also applies to dockerized Executors
. When running a dockerized Executor locally, the logging configuration
file can be mounted using volumes.
Methods#
The most important methods of the Flow
object are the following:
Method |
Description |
---|---|
|
Adds an Executor to the Flow |
|
Starts the Flow. This will start all its Executors and check if they are ready to be used. |
|
Stops and closes the Flow. This will stop and shutdown all its Executors. |
|
Uses the Flow as a context manager. It will automatically start and stop your Flow. |
|
Visualizes the Flow. Helpful for building complex pipelines. |
|
Sends requests to the Flow API. |
|
Blocks execution until the program is terminated. This is useful to keep the Flow alive so it can be used from other places (clients, etc). |
|
Generates a Docker-Compose file listing all Executors as services. |
|
Generates Kubernetes configuration files in |
|
Check if the Flow is ready to process requests. Returns a boolean indicating the readiness. |