Elasticsearch/dataflow - Connection Timeout After ~60 Concurrent Connection
Solution 1:
I did a little bit of research about the connector for ElasticSearch. There are a two principles that you may want to try to ensure your connector is as efficient as possible.
Note Setting a maximum number of workers, as suggested in the other answer, will probably not help as much (for now) - let's improve utilization from your Beam/Elastic cluster resources, and if we start hitting limits for either, then we can consider restricting # of workers - but right now, you can try to improve your connector.
Using bulk requests to external services
The code you provide issues an individual search request for every element coming into the DoFn. As you've noted, this works fine, but it will cause your pipeline to spend too much time waiting on external requests for each element - so your wait for roundtrips will be O(n).
Gladly, the Elasticsearch
client has an msearch
method, which should allow you to perform searches in bulk. You can do something like this:
classPredictionFn(beam.DoFn):
def__init__(self, ...):
self.buffer = []
...
defprocess(self, element):
self.buffer.append(element)
iflen(self.buffer) > BATCH_SIZE:
return self.flush()
defflush(self):
result = []
# Perform the search requests for user ids
user_ids = [uid for cid, did, uid in self.buffer]
user_ids_request = self._build_uid_reqs(user_ids)
resp = es.msearch(body=user_ids_request)
user_id_and_device_id_lists = []
for r, elm inzip(resp['responses'], self.buffer):
iflen(r["hits"]["hits"]) == 0:
continue# Get new device_id_list
user_id_and_device_id_lists.append((elm[2], # User ID
device_id_list))
device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]
device_ids_request = self._build_device_id_reqs(device_id_lists)
resp = es.msearch(body=device_ids_request)
resp = self.elasticsearch.search(index="sessions", body={"query": {"match": {"userId": user_id }}})
# Handle the result, output anything necessarydef_build_uid_reqs(self, uids):
# Relying on this answer: https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352
res = []
for uid in uids:
res.append(json.dumps({'index': 'sessions'})) # Request HEAD
res.append(json.dumps({"query": {"match": {"userId": uid }}})) # Request BODYreturn'\n'.join(res)
Reusing the client as it's thread-safe
The Elasticsearch
client is also thread safe!
So rather than creating a new one every time, you can do something like this:
classPredictionFn(beam.DoFn):
CLIENT = Nonedefinit_elasticsearch(self):
if PredictionFn.CLIENT isnotNone:
return PredictionFn.CLIENT
es_host = fetch_host()
http_auth = fetch_auth()
PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth,
timeout=300, sniff_on_connection_fail=True,
retry_on_timeout=True, max_retries=2,
maxsize=5) # 5 connections per clientreturn PredictionFn.CLIENT
This should ensure that you keep a single client for each worker, and you won't be creating so many connections to ElasticSearch - and thus not getting the rejection messages.
Let me know if these two help, or if we need to try further improvements!
Solution 2:
EDIT: This was red herring. CLOSE_WAIT is not related. I again had the same issue and most of connections are now in ESTABLISHED status :/
While both of answers below are insightful, I don't think they answered the question.
After some more investigation, I find out that somehow elasticsearch-py (or urllib3), in combination with dataflow, will leave connection in CLOSE_WAIT
status. Once connection got this status, these connections got stuck (OS will not release these sockets because OS thinks application code will close it) so after running job sometime, all of my connections in connection pool are in this CLOSE_WAIT status and therefore I cannot make any new connections. If I don't use connection pool and instantiate elasticsaerch client for each pardo, it just gets worth, somehow connections got stuck even faster.
I reported issue here https://github.com/elastic/elasticsearch-py/issues/1459 but honestly the issue seems deeper in stack, because I had similar issue when I directly used requests
package's connection pool (which I believe also used urllib3 under the hood).
Solution 3:
Dataflow has no limit on the number of outgoing connections. It uses a K8s cluster under the hood, and every python thread lives into their own docker container.
API calls to Elastic cloud are rate-limited (take a look at the x-rate-limit-{interval,limit,remaining} fields in the response headers).
With Dataflow it is very easy to hit API rate limits if you do a lot of parallel jobs and/or google cloud scales up the nodes of your job to make it faster.
Possible workarounds in your Dataflow / Apache Beam job:
1 - (no code required) Play with (Dataflow execution parameters)[ https://cloud.google.com/dataflow/docs/guides/specifying-exec-params] to limit the number of concurrent processing threads.
The three parameters you need to tweak are:
max_num_workers
: maximum number of worker instances (machines) running.number_of_worker_harness_threads
: by default 1 thead per CPU your instance has.machine_type
: the instance type you will use.
2 - Implement rate-limit on your code. See Apache Beam Timely (and stateful) processing processing with Apache Beam
Post a Comment for "Elasticsearch/dataflow - Connection Timeout After ~60 Concurrent Connection"