Scale Out#
By default, all Executors in an Orchestration run with a single instance. If an Executor is particularly slow, then it will reduce the overall throughput. To solve this, you can specify the number of replicas
to scale out an Executor.
Replicate stateless Executors#
Replication creates multiple copies of the same Executor
. Each request in the Orchestration is then passed to only one replica (instance) of that Executor. All replicas compete for a request. The idle replica gets the request first.
This is useful for improving performance and availability:
If you have slow Executors (e.g. embedding) you can scale up the number of instances to process multiple requests in parallel.
Executors might need to be taken offline occasionally (for updates, failures, etc.), but you may want your Orchestration to still process requests without any downtime. Adding replicas allows any replica to be taken down as long as there is at least one still running. This ensures the high availability of your Orchestration.
Replicate Executors in a Deployment#
from jina import Deployment
dep = Deployment(name='slow_encoder', replicas=3)
jtype: Deployment
uses: jinaai://jina-ai/CLIPEncoder
install_requirements: True
replicas: 5
Replicate Executors in a Flow#
from jina import Flow
f = Flow().add(name='slow_encoder', replicas=3).add(name='fast_indexer')
jtype: Flow
executors:
- uses: jinaai://jina-ai/CLIPEncoder
install_requirements: True
replicas: 5
Replicate stateful Executors with consensus using RAFT (Beta)#
Python3.8 or newer version required on MacOS
This feature requires at least Python3.8 version when working on MacOS.
Feature not supported on Windows
This feature is not supported when using Windows
DocArray 0.30
Starting from DocArray version 0.30, DocArray changed its interface and implementation drastically. We intend to support these new versions in the near future, but not every feature is yet available. Check here for more information. This feature has been added with the new DocArray support.
gRPC protocol
This feature is only available when using gRPC as the protocol for the Deployment or when the Deployment is part of a Flow
Replication is used to scale out Executors by creating copies of them that can handle requests in parallel, providing better RPS. However, when an Executor maintains some sort of state, then it is not simple to guarantee that each copy of the Executor maintains the same state, which can lead to undesired behavior, since each replica can provide different results depending on the specific state they hold.
In Jina, you can also have replication while guaranteeing the consensus between Executors. For this, we rely on RAFT, which is an algorithm that guarantees eventual consistency between replicas.
Consensus-based replication using RAFT is a distributed algorithm designed to provide fault tolerance and consistency in a distributed system. In a distributed system, the nodes may fail, and messages may be lost or delayed, which can lead to inconsistencies in the system. The problem with traditional replication methods is that they can’t guarantee consistency in a distributed system in the presence of failures. This is where consensus-based replication using RAFT comes in. With this approach, each Executor can be considered as a Finite State Machine, meaning it has a set of potential states and a set of transitions that it can make between those states. Each request that is sent to the Executor can be considered as a log entry that needs to be replicated across the cluster.
To enable this kind of replication, we need to consider:
Specify which methods of the Executor can update its internal state.
Tell the Deployment to use the RAFT consensus algorithm by setting the
--stateful
argument.Set values of replicas compatible with RAFT. RAFT requires at least three replicas to guarantee consistency.
Pass the
--peer-ports
argument so that the RAFT cluster can recover from a previous configuration of replicas if existed.Optionally you can pass
--raft-configuration
parameter to tweak the behavior of the consensus module. You can understand the values to pass from Hashicorp’s RAFT library.
from jina import Deployment, Executor, requests
from jina.serve.executors.decorators import write
from docarray import DocList
from docarray.documents import TextDoc
class MyStateStatefulExecutor(Executor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._docs_dict = {}
@requests(on=['/index'])
@write
def index(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
for doc in docs:
self._docs_dict[doc.id] = doc
@requests(on=['/search'])
def search(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
for doc in docs:
self.logger.debug(f'Searching against {len(self._docs_dict)} documents')
doc.text = self._docs_dict[doc.id].text
d = Deployment(name='stateful_executor',
uses=MyStateStatefulExecutor,
replicas=3,
stateful=True,
workspace='./raft',
peer_ports=[12345, 12346, 12347])
with d:
d.block()
This capacity allows you not only to have replicas that work with robustness and availability, it also can help achieve higher throughput in some cases.
Let’s imagine we write an Executor that is used to index and query documents from a vector index.
For this, we will use an in-memory solution from DocArray that performs exact vector search.
from jina import Deployment, Executor, requests
from jina.serve.executors.decorators import write
from docarray import DocList
from docarray.documents import TextDoc
from docarray.index.backends.in_memory import InMemoryExactNNIndex
class QueryDoc(TextDoc):
matches: DocList[TextDoc] = DocList[TextDoc]()
class ExactNNSearch(Executor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._index = InMemoryExactNNIndex[TextDoc]()
@requests(on=['/index'])
@write # I add write decorator to indicate that calling this endpoint updates the inner state
def index(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
self.logger.info(f'Indexing Document in index with {len(self._index)} documents indexed')
self._index.index(docs)
@requests(on=['/search'])
def search(self, docs: DocList[QueryDoc], **kwargs) -> DocList[QueryDoc]:
self.logger.info(f'Searching Document in index with {len(self._index)} documents indexed')
for query in docs:
docs, scores = self._index.find(query, search_field='embedding', limit=100)
query.matches = docs
d = Deployment(name='indexer',
port=5555,
uses=ExactNNSearch,
workspace='./raft',
replicas=3,
stateful=True,
peer_ports=[12345, 12346, 12347])
with d:
d.block()
Then in another terminal, we will send index and search requests:
from jina import Client
from docarray import DocList
from docarray.documents import TextDoc
import time
import numpy as np
class QueryDoc(TextDoc):
matches: DocList[TextDoc] = DocList[TextDoc]()
NUM_DOCS_TO_INDEX = 100000
NUM_QUERIES = 1000
c = Client(port=5555)
index_docs = DocList[TextDoc](
[TextDoc(text=f'I am document {i}', embedding=np.random.rand(128)) for i in range(NUM_DOCS_TO_INDEX)])
start_indexing_time = time.time()
c.post(on='/index', inputs=index_docs, request_size=100)
print(f'Indexing {NUM_DOCS_TO_INDEX} Documents took {time.time() - start_indexing_time}s')
time.sleep(2) # let some time for the data to be replicated
search_da = DocList[QueryDoc](
[QueryDoc(text=f'I am document {i}', embedding=np.random.rand(128)) for i in range(NUM_QUERIES)])
start_querying_time = time.time()
responses = c.post(on='/search', inputs=search_da, request_size=1)
print(f'Searching {NUM_QUERIES} Queries took {time.time() - start_querying_time}s')
for res in responses:
print(f'{res.matches}')
In the logs of the server
you can see how index
requests reach every replica while search
requests only reach one replica in a
round robin
fashion.
Eventually every Indexer replica ends up with the same Documents indexed.
INFO indexer/rep-2@923 Indexing Document in index with 99900 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99200 documents indexed
INFO indexer/rep-1@910 Indexing Document in index with 99700 documents indexed
INFO indexer/rep-1@910 Indexing Document in index with 99800 documents indexed [04/28/23 16:51:06]
INFO indexer/rep-0@902 Indexing Document in index with 99300 documents indexed [04/28/23 16:51:06]
INFO indexer/rep-1@910 Indexing Document in index with 99900 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99400 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99500 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99600 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99700 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99800 documents indexed
INFO indexer/rep-0@902 Indexing Document in index with 99900 documents indexed
But at search time, the consensus module does not affect, and only one replica serves the queries.
INFO indexer/rep-0@902 Searching Document in index with 100000 documents indexed [04/28/23 16:59:21]
INFO indexer/rep-1@910 Searching Document in index with 100000 documents indexed [04/28/23 16:59:21]
INFO indexer/rep-2@923 Searching Document in index with 100000 documents indexed
If you run the same example by setting replicas
to 1
without the consensus module, you can see the benefits it has in the QPS at search time,
while there is a little cost on the time used for indexing.
d = Deployment(name='indexer',
port=5555,
uses=ExactNNSearch,
workspace='./raft',
replicas=1)
With one replica:
Indexing 100000 Documents took 18.93274688720703s
Searching 1000 Queries took 385.96641397476196s
With three replicas and consensus:
Indexing 100000 Documents took 35.066415548324585s
Searching 1000 Queries took 202.07950615882874s
This increases QPS from 2.5 to 5.
Replicate on multiple GPUs#
To replicate your Executor
s so that each replica uses a different GPU on your machine, you can tell the Orchestration to use multiple GPUs by passing CUDA_VISIBLE_DEVICES=RR
as an environment variable.
Caution
You should only replicate on multiple GPUs with CUDA_VISIBLE_DEVICES=RR
locally.
Tip
In Kubernetes or with Docker Compose you should allocate GPU resources to each replica directly in the configuration files.
The Orchestration assigns GPU devices in the following round-robin fashion:
GPU device |
Replica ID |
---|---|
0 |
0 |
1 |
1 |
2 |
2 |
0 |
3 |
1 |
4 |
You can restrict the visible devices in round-robin assignment using CUDA_VISIBLE_DEVICES=RR0:2
, where 0:2
corresponds to a Python slice. This creates the following assignment:
GPU device |
Replica ID |
---|---|
0 |
0 |
1 |
1 |
0 |
2 |
1 |
3 |
0 |
4 |
You can restrict the visible devices in round-robin assignment by assigning the list of device IDs to CUDA_VISIBLE_DEVICES=RR1,3
. This creates the following assignment:
GPU device |
Replica ID |
---|---|
1 |
0 |
3 |
1 |
1 |
2 |
3 |
3 |
1 |
4 |
You can also refer to GPUs by their UUID. For instance, you could assign a list of device UUIDs:
CUDA_VISIBLE_DEVICES=RRGPU-0aaaaaaa-74d2-7297-d557-12771b6a79d5,GPU-0bbbbbbb-74d2-7297-d557-12771b6a79d5,GPU-0ccccccc-74d2-7297-d557-12771b6a79d5,GPU-0ddddddd-74d2-7297-d557-12771b6a79d5
Check CUDA Documentation to see the accepted formats to assign CUDA devices by UUID.
GPU device |
Replica ID |
---|---|
GPU-0aaaaaaa-74d2-7297-d557-12771b6a79d5 |
0 |
GPU-0bbbbbbb-74d2-7297-d557-12771b6a79d5 |
1 |
GPU-0ccccccc-74d2-7297-d557-12771b6a79d5 |
2 |
GPU-0ddddddd-74d2-7297-d557-12771b6a79d5 |
3 |
GPU-0aaaaaaa-74d2-7297-d557-12771b6a79d5 |
4 |
For example, if you have three GPUs and one of your Executor has five replicas then:
GPU replicas in a Deployment#
from jina import Deployment
dep = Deployment(uses='jinaai://jina-ai/CLIPEncoder', replicas=5, install_requirements=True)
with dep
dep.block()
CUDA_VISIBLE_DEVICES=RR python deployment.py
jtype: Deployment
with:
uses: jinaai://jina-ai/CLIPEncoder
install_requirements: True
replicas: 5
CUDA_VISIBLE_DEVICES=RR jina deployment --uses deployment.yaml
GPU replicas in a Flow#
f = Flow().add(
uses='jinaai://jina-ai/CLIPEncoder', replicas=5, install_requirements=True
)
with f:
f.block()
CUDA_VISIBLE_DEVICES=RR python flow.py
jtype: Flow
executors:
- uses: jinaai://jina-ai/CLIPEncoder
install_requirements: True
replicas: 5
CUDA_VISIBLE_DEVICES=RR jina flow --uses flow.yaml
Replicate external Executors#
If you have external Executors with multiple replicas running elsewhere, you can add them to your Orchestration by specifying all the respective hosts and ports:
from jina import Deployment
replica_hosts, replica_ports = ['localhost','91.198.174.192'], ['12345','12346']
Deployment(host=replica_hosts, port=replica_ports, external=True)
# alternative syntax
Deployment(host=['localhost:12345','91.198.174.192:12346'], external=True)
from jina import Flow
replica_hosts, replica_ports = ['localhost','91.198.174.192'], ['12345','12346']
Flow().add(host=replica_hosts, port=replica_ports, external=True)
# alternative syntax
Flow().add(host=['localhost:12345','91.198.174.192:12346'], external=True)
This connects to grpc://localhost:12345
and grpc://91.198.174.192:12346
as two replicas of the external Executor.
Reducing
If an external Executor needs multiple predecessors, reducing needs to be enabled. So setting no_reduce=True
is not allowed for these cases.
Customize polling behaviors#
Replicas compete for a request, so only one of them will get the request. What if we want all replicas to get the request?
For example, consider index and search requests:
Index (and update, delete) are handled by a single replica, as this is sufficient to add it one time.
Search requests are handled by all replicas, as you need to search over all replicas to ensure the completeness of the result. The requested data could be on any shard.
For this purpose, you need shards
and polling
.
You can define if all or any shards
receive the request by specifying polling
. ANY
means only one shard receives the request, while ALL
means that all shards receive the same request.
from jina import Deployment
dep = Deployment(name='ExecutorWithShards', shards=3, polling={'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'})
from jina import Flow
f = Flow().add(name='ExecutorWithShards', shards=3, polling={'/custom': 'ALL', '/search': 'ANY', '*': 'ANY'})
The above example results in an Orchestration having the Executor ExecutorWithShards
with the following polling options:
/index
has pollingANY
(the default value is not changed here)./search
has pollingANY
as it is explicitly set (usually that should not be necessary)./custom
has pollingALL
.All other endpoints have polling
ANY
due to using*
as a wildcard to catch all other cases.
Understand behaviors of replicas and shards with polling#
The following example demonstrates the different behaviors when setting replicas
, shards
and polling
together.
from jina import Deployment, Executor, requests
from docarray import DocList
from docarray.documents import TextDoc
class MyExec(Executor):
@requests
def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
print(f'inside: {docs.text}')
dep = (
Deployment(uses=MyExec, replicas=2, polling='ANY')
.needs_all()
)
with dep:
r = dep.post('/', TextDoc(text='hello'), return_type=DocList[TextDoc])
print(f'return: {r.text}')
from jina import Flow, Executor, requests
from docarray import DocList
from docarray.documents import TextDoc
class MyExec(Executor):
@requests
def foo(self, docs: DocList[TextDoc], **kwargs) -> DocList[TextDoc]:
print(f'inside: {docs.text}')
f = (
Flow()
.add(uses=MyExec, replicas=2, polling='ANY')
.needs_all()
)
with f:
r = dep.post('/', TextDoc(text='hello'), return_type=DocList[TextDoc])
print(f'return: {r.text}')
We now change the combination of the yellow highlighted lines above and see if there is any difference in the console output (note two prints in the snippet):
|
|
|
---|---|---|
|
|
|
|
|
|