Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
681 views
in Technique[技术] by (71.8m points)

google app engine - Best practice to query large number of ndb entities from datastore

I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

Background

I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

class Session(ndb.Model):
   """ A tracked user session. (customer account (company), version, OS, etc) """
   data = ndb.JsonProperty(required = False, indexed = False)

class Sample(ndb.Model):
   name      = ndb.StringProperty  (required = True,  indexed = True)
   session   = ndb.KeyProperty     (required = True,  kind = Session)
   timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
   tags      = ndb.StringProperty  (repeated = True,  indexed = True)

You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

So the input to the handler be something like:

  • Start Date
  • End Date
  • Tag(s)

Output would be:

[{
   'company_account': <string>,
   'counts': [
      {'timeperiod': <iso8601 date>, 'count': <int>}, ...
   ]
 }, ...
]

Common Code for Queries

Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

# -- Build Query Object --- #
query_opts = {}
query_opts['batch_size'] = 500   # Bring in large groups of entities

q = Sample.query()
q = q.order(Sample.timestamp)

# Tags
tag_args = [(Sample.tags == t) for t in tags]
q = q.filter(ndb.query.AND(*tag_args))

def handle_sample(sample):
   session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)

Methods Tried

I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

A. Single Iteration

This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
q_iter = q.iter(**query_opts)

for sample in q_iter:
   handle_sample(sample)

B. Large Fetch

The idea here was to see if I could do a single very large fetch.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)
samples = q.fetch(20000, **query_opts)

for sample in samples:
   handle_sample(sample)

C. Async fetches across time range

The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

# split up timestamp space into 20 equal parts and async query each of them
ts_delta       = (end_time - start_time) / 20
cur_start_time = start_time
q_futures = []

for x in range(ts_intervals):
   cur_end_time = (cur_start_time + ts_delta)
   if x == (ts_intervals-1):    # Last one has to cover full range
      cur_end_time = end_time

   f = q.filter(Sample.timestamp >= cur_start_time,
                Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
   q_futures.append(f)
   cur_start_time = cur_end_time

# Now loop through and collect results
for f in q_futures:
   samples = f.get_result()
   for sample in samples:
      handle_sample(sample)

D. Async mapping

I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

q = q.filter(Sample.timestamp >= start_time)
q = q.filter(Sample.timestamp <= end_time)

@ndb.tasklet
def process_sample(sample):
   period_ts   = getPeriodTimestamp(sample.timestamp)
   session_obj = yield sample.session.get_async()    # Lookup the session object from cache
   count_key   = session_obj.data['customer']
   addCountForPeriod(count_key, sample.timestamp)
   raise ndb.Return(None)

q_future = q.map_async(process_sample, **query_opts)
res = q_future.get_result()

Outcome

I tested out one example query to collect overall response time and appstats traces. The results are:

A. Single Iteration

real: 15.645s

This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

Method A appstats

B. Large Fetch

real: 12.12s

Effectively the same as option A but a bit faster for some reason.

Method B appstats

C. Async fetches across time range

real: 15.251s

Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

Method C appstats

D. Async mapping

real: 13.752s

This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

Method D appstats

Recommendations

Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

Update

In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

After getting advice from several people, I found that the following items were key to understand and keep in mind:

  • Multiple queries can run in parallel
  • Only 10 RPC's can be in flight at once
  • Try to denormalize to the point that there are no secondary queries
  • This type of task is better left to map reduce and task queues, not real-time queries

So what I did to make it faster:

  • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
  • I denormalized the data further to remove the need for the secondary session query
  • I made use of ndb async operations and wait_any() to overlap the queries with the processing

I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...