Data Preprocessing
The data preprocessing process is the same as that of the original model. Some of the code is tailored to the Ascend AI Processor for higher compute capability. The displayed code shows the modifications.
Defining the Input Function input_fn
Data preprocessing of the ImageNet dataset is used as an example. The modified .py files and functions for adapting to the Ascend AI Processor are as follows.
Function |
Description |
Location |
|---|---|---|
input_fn() |
Input function that processes the dataset for Estimator training and outputs real data. |
official/r1/resnet/imagenet_main.py |
resnet_main() |
Main API that contains data input, run configuration, training, and validation. |
official/r1/resnet/resnet_run_loop.py |
- Import the following header files to the official/r1/resnet/imagenet_main.py file:
1 2
from hccl.manage.api import get_rank_size from hccl.manage.api import get_rank_id
- Obtain the Ascend AI Processor quantity and IDs to support data parallel.
Tweak: input_fn() in official/r1/resnet/imagenet_main.py (The changes are in bold.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
def input_fn(is_training, data_dir, batch_size, num_epochs=1, dtype=tf.float32, datasets_num_private_threads=None, parse_record_fn=parse_record, input_context=None, drop_remainder=False, tf_data_experimental_slack=False): """Function that provides training and validation batches. Parameter description: is_training: boolean value indicating whether the input is used for training. data_dir: file path that contains the input dataset. batch_size: size of each batch. num_epochs: number of epochs. dtype: data type of an image or feature. datasets_num_private_threads: number of threads dedicated to tf.data. parse_record_fn: entry point function for parsing TFRecords. input_context: tf.distribute.InputContext object passed by tf.distribute.Strategy drop_remainder: specifies whether to retain or discard the last batch if the data volume of the last batch is smaller than the value of batch_size. If it is set to True, the batch dimension is fixed. tf_data_experimental_slack: specifies whether to enable the experimental_slack option of tf.data. Returns: A dataset that can be used for iteration. """ # Obtain the file path. filenames = get_filenames(is_training, data_dir) # Split the file based on the first dimension. dataset = tf.data.Dataset.from_tensor_slices(filenames) if input_context: # Obtain Ascend AI Processor number and IDs to support data parallel. ############## npu modify begin ############# dataset = dataset.shard(get_rank_size(),get_rank_id()) ############## npu modify end ############### # tf.compat.v1.logging.info( # 'Sharding the dataset: input_pipeline_id=%d num_input_pipelines=%d' % ( # input_context.input_pipeline_id, input_context.num_input_pipelines)) # dataset = dataset.shard(input_context.num_input_pipelines, # input_context.input_pipeline_id) if is_training: # Disorder the files. dataset = dataset.shuffle(buffer_size=_NUM_TRAIN_FILES) # cycle_length = 10 Read and deserialize 10 files in parallel. You can increase the value if the CPU resources are sufficient. dataset = dataset.interleave( tf.data.TFRecordDataset, cycle_length=10, num_parallel_calls=tf.data.experimental.AUTOTUNE) return resnet_run_loop.process_record_dataset( dataset=dataset, is_training=is_training, batch_size=batch_size, shuffle_buffer=_SHUFFLE_BUFFER, parse_record_fn=parse_record_fn, num_epochs=num_epochs, dtype=dtype, datasets_num_private_threads=datasets_num_private_threads, drop_remainder=drop_remainder, tf_data_experimental_slack=tf_data_experimental_slack, )
- In input_fn() in the training or testing scenario, drop_remainder must be set to True.Tweak: resnet_main() in official/r1/resnet/resnet_run_loop.py (The input_fn_train() and input_fn_eval() child functions are tweaked.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
def input_fn_train(num_epochs, input_context=None): ############## npu modify begin ############# # Set dtype to tf.float16 for better data transfer performance. # In the current version, drop_remainder can only be set to True. # batch_size indicates the batch size of a single device instead of the global batch size. return input_function( is_training=True, data_dir=flags_obj.data_dir, batch_size=flags_obj.batch_size, num_epochs=num_epochs, dtype=tf.float16, input_context=input_context, drop_remainder=True) def input_fn_eval(): # Set dtype to tf.float16 for better data transfer performance. # In the current version, drop_remainder can only be set to True. # batch_size indicates the batch size of a single device instead of the global batch size. return input_function( is_training=False, data_dir=flags_obj.data_dir, batch_size=flags_obj.batch_size, num_epochs=1, dtype=tf.float16, input_context=True, drop_remainder=True) ############## npu modify end ############### # input_fn() for training and validation in the code are as follows. # def input_fn_train(num_epochs, input_context=None): # return input_function( # is_training=True, # data_dir=flags_obj.data_dir, # batch_size=distribution_utils.per_replica_batch_size( # flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)), # num_epochs=num_epochs, # dtype=flags_core.get_tf_dtype(flags_obj), # datasets_num_private_threads=flags_obj.datasets_num_private_threads, # input_context=input_context) # # def input_fn_eval(): # return input_function( # is_training=False, # data_dir=flags_obj.data_dir, # batch_size=distribution_utils.per_replica_batch_size( # flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)), # num_epochs=1, # dtype=flags_core.get_tf_dtype(flags_obj))
Parent topic: Manual Porting Samples (ResNet-50 Models)