Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
313 views
in Technique[技术] by (71.8m points)

python - Parallelize tf.from_generator using tf.contrib.data.parallel_interleave

I have a bunch of JSON array files (AVRO to be accurate) and each of them yield multiple samples for training a Keras Model. Using ideas from @GPhilo and from @jsimsa, I was able to come up with this to parallelize my input pipeline. Unable to figure out how to design the generator(n) to divide the work of processing files. The code fails inside parse_file(f) as the function expects a string file path and not a Tensor,

N = num_cores = 2
files_to_process = ["f1.avro", "f2.avro", "f3.avro"]
shuffle_size = prefetch_buffer = 1000
batch_size = 512

def generator(n):
    size = math.ceil(len(files_to_process) / N)
    start_index = n * size
    end_index = start_index + size

    def gen():
        # for f in files_to_process[start_index:end_index]:
        for f in tf.slice(files_to_process, start_index, size):
            yield f

    return gen

def dataset(n):
    return tf.data.Dataset.from_generator(generator(n), (tf.string,))

def process_file(f):
    examples_x, examples_y = parse_file(f)
    return examples_x, examples_y

ds = tf.data.Dataset.range(N)
ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N))
ds = ds.map(process_file, num_parallel_calls=N)
ds = ds.prefetch(prefetch_buffer)
ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
ds = ds.batch(batch_size).shuffle(shuffle_size)

...
myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size )
  • What is the correct way to design generator(n) here
  • Is this an optimized way to design my input pipeline using parallel_interleave and flat_map
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

It seems to me you're complicating your life unnecessarily with the generator. This is how I'd implement your input pipeline:

def parse_file_tf(filename):
    return tf.py_func(parse_file, [filename], [tf.float32, tf.float32])

# version with map
files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.map(parse_file_tf, num_parallel_calls=N)
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2)
it = dataset.make_one_shot_iterator()

To test it, I define a dummy parse_file as:

i=0
def parse_file(f):
    global i
    i += 1
    return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y

which I feed into a basic loop that shows what the iterator returns:

sess = tf.Session()
try:
    while True:
        x, y = it.get_next()
        vx, vy = sess.run([x,y])
        print(vx)
        print(vy)
except tf.errors.OutOfRangeError:
    pass
sess.close()

Running the code above prints:

[2. 3. 2. 1. 3. 3.]
[2. 3. 2. 1. 3. 3.]

Explanation of the pipeline

Essentially, I leave the parallelization issue to map, where I can pass the number of threads it should run. No need for generators iterating over ranges and those extra complications.

I chose map over parallel_interleave because the latter requires you to generate a Dataset instance for each item it returns, which in your case doesn't really make sense because you already have loaded all values in memory when you run parse_file. parallel_interleave makes sense if you slowly generate the values (e.g., by applying tf.data.TFRecordDataset to a list of filenames), but if your dataset fits in memory go for map.

About the tf.py_func limitations, they do not affect your trained network, only the input pipeline. Ideally, you'll have a different pipeline for your training and for your final use of the network. You only need to take care of the limitations during the latter, while for training (unless you do something very specific with distributed training and/or moving the training across machines) you're reasonably safe.


Version with Generator

If your JSON files are very big and their content won't fit in memory, you can use a generator, but slightly different from the approach you began with. The idea is, the generator goes through the JSON file and yields one record at a time. Then, the generator has to be your parse_file function. As an example, let's assume you have the following parse_file generator:

i = 3
def parse_file(filename):
    global i
    i += 1
    ctr = 0
    while ctr < i:
        yield ctr, ctr

In this case, the pipeline would look as follows:

def wrap_generator(filename):
    return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32])

files = tf.data.Dataset.from_tensor_slices(files_to_process)
dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N))
dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x))
dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2)
it = dataset.make_one_shot_iterator()

Note that here we need to use parallel_interleave because we turn the generators into Dataset instances from which we extract values. The rest stays the same.

Feeding this to the same sample loop as above prints:

[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]
[6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...