CNTK - 内存和大型数据集

  • 简述

    在本章中,我们将学习如何使用 CNTK 中的内存和大型数据集。
  • 使用小内存数据集进行训练

    当我们谈到将数据输入 CNTK 训练器时,可以有多种方法,但这取决于数据集的大小和数据的格式。数据集可以是内存中的小型数据集或大型数据集。
    在本节中,我们将使用内存数据集。为此,我们将使用以下两个框架 -
    • Numpy
    • Pandas
  • 使用 Numpy 数组

    在这里,我们将使用 CNTK 中基于 numpy 的随机生成数据集。在此示例中,我们将模拟二进制分类问题的数据。假设,我们有一组具有 4 个特征的观察结果,并希望使用我们的深度学习模型预测两个可能的标签。

    实现示例

    为此,首先我们必须生成一组标签,其中包含我们想要预测的标签的单热向量表示。它可以在以下步骤的帮助下完成 -
    第 1 步- 导入numpy包,如下所示 -
    
    
    import numpy as np
    
    num_samples = 20000
    
    
    第 2 步- 接下来,使用np.eye函数生成标签映射,如下所示 -
    
    
    label_mapping = np.eye(2)
    
    
    第 3 步- 现在使用np.random.choice函数,收集 20000 个随机样本,如下所示 -
    
    
    y = label_mapping[np.random.choice(2,num_samples)].astype(np.float32)
    
    
    第 4 步- 现在最后使用 np.random.random 函数,生成一个随机浮点值数组,如下所示 -
    
    
    x = np.random.random(size=(num_samples, 4)).astype(np.float32)
    
    
    有一次,我们生成了一个随机浮点值数组,我们需要将它们转换为 32 位浮点数,以便它可以匹配 CNTK 期望的格式。让我们按照以下步骤执行此操作 -
    第 5 步- 从 cntk.layers 模块导入密集和顺序层函数,如下所示 -
    
    
    from cntk.layers import Dense, Sequential
    
    
    第 6 步- 现在,我们需要为网络中的层导入激活函数。让我们导入sigmoid作为激活函数 -
    
    
    from cntk import input_variable, default_options
    
    from cntk.ops import sigmoid
    
    
    第 7 步- 现在,我们需要导入损失函数来训练网络。让我们导入binary_cross_entropy作为损失函数 -
    
    
    from cntk.losses import binary_cross_entropy
    
    
    第 8 步- 接下来,我们需要定义网络的默认选项。在这里,我们将提供sigmoid激活函数作为默认设置。此外,使用顺序层函数创建模型如下 -
    
    
    with default_options(activation=sigmoid):
    
    model = Sequential([Dense(6),Dense(2)])
    
    
    第 9 步- 接下来,使用 4 个输入特征初始化一个input_variable,作为网络的输入。
    
    
    features = input_variable(4)
    
    
    第 10 步- 现在,为了完成它,我们需要将特征变量连接到 NN。
    
    
    z = model(features)
    
    
    因此,现在我们有了一个 NN,借助以下步骤,让我们使用内存数据集对其进行训练 -
    第 11 步- 要训练这个 NN,首先我们需要从cntk.learners模块导入学习者。我们将按如下方式导入sgd学习器 -
    
    
    from cntk.learners import sgd
    
    
    第 12 步- 同时从cntk.logging模块导入ProgressPrinter 。
    
    
    from cntk.logging import ProgressPrinter
    
    progress_writer = ProgressPrinter(0)
    
    
    第 13 步- 接下来,为标签定义一个新的输入变量,如下所示 -
    
    
    labels = input_variable(2)
    
    
    第 14 步- 为了训练 NN 模型,接下来,我们需要使用binary_cross_entropy函数定义损失。此外,提供模型 z 和标签变量。
    
    
    loss = binary_cross_entropy(z, labels)
    
    
    第 15 步- 接下来,初始化sgd学习器,如下所示 -
    
    
    learner = sgd(z.parameters, lr=0.1)
    
    
    第 16 步- 最后,在损失函数上调用 train 方法。此外,为它提供输入数据、sgd学习器和progress_printer 。 -
    
    
    training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=[progress_writer])
    
    

    完整的实现示例

    
    
    import numpy as np
    
    num_samples = 20000
    
    label_mapping = np.eye(2)
    
    y = label_mapping[np.random.choice(2,num_samples)].astype(np.float32)
    
    x = np.random.random(size=(num_samples, 4)).astype(np.float32)
    
    from cntk.layers import Dense, Sequential
    
    from cntk import input_variable, default_options
    
    from cntk.ops import sigmoid
    
    from cntk.losses import binary_cross_entropy
    
    with default_options(activation=sigmoid):
    
       model = Sequential([Dense(6),Dense(2)])
    
    features = input_variable(4)
    
    z = model(features)
    
    from cntk.learners import sgd
    
    from cntk.logging import ProgressPrinter
    
    progress_writer = ProgressPrinter(0)
    
    labels = input_variable(2)
    
    loss = binary_cross_entropy(z, labels)
    
    learner = sgd(z.parameters, lr=0.1)
    
    training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=[progress_writer])
    
    

    输出

    
    
    Build info:
    
         Built time: *** ** **** 21:40:10
    
         Last modified date: *** *** ** 21:08:46 2019
    
         Build type: Release
    
         Build target: CPU-only
    
         With ASGD: yes
    
         Math lib: mkl
    
         Build Branch: HEAD
    
         Build SHA1:ae9c9c7c5f9e6072cc9c94c254f816dbdc1c5be6 (modified)
    
         MPI distribution: Microsoft MPI
    
         MPI version: 7.0.12437.6
    
    -------------------------------------------------------------------
    
    average   since   average   since examples
    
    loss      last    metric    last
    
    ------------------------------------------------------
    
    Learning rate per minibatch: 0.1
    
    1.52      1.52      0         0     32
    
    1.51      1.51      0         0     96
    
    1.48      1.46      0         0    224
    
    1.45      1.42      0         0    480
    
    1.42       1.4      0         0    992
    
    1.41      1.39      0         0   2016
    
    1.4       1.39      0         0   4064
    
    1.39      1.39      0         0   8160
    
    1.39      1.39      0         0  16352
    
    
  • 使用 Pandas 数据框

    Numpy 数组可以包含的内容非常有限,并且是存储数据的最基本方式之一。例如,单个 n 维数组可以包含单一数据类型的数据。但另一方面,对于许多现实世界的案例,我们需要一个能够在单个数据集中处理多个数据类型的库。
    其中一个名为 Pandas 的 Python 库可以更轻松地处理此类数据集。它引入了 DataFrame (DF) 的概念,并允许我们从以各种格式存储为 DF 的磁盘加载数据集。例如,我们可以读取存储为 CSV、JSON、Excel 等格式的 DF。

    实现示例

    在此示例中,我们将使用基于四个属性对三种可能的鸢尾花种类进行分类的示例。我们也在前面的部分中创建了这个深度学习模型。模型如下 -
    
    
    from cntk.layers import Dense, Sequential
    
    from cntk import input_variable, default_options
    
    from cntk.ops import sigmoid, log_softmax
    
    from cntk.losses import binary_cross_entropy
    
    model = Sequential([
    
    Dense(4, activation=sigmoid),
    
    Dense(3, activation=log_softmax)
    
    ])
    
    features = input_variable(4)
    
    z = model(features)
    
    
    上面的模型包含一个隐藏层和一个带有三个神经元的输出层,以匹配我们可以预测的类别数量。
    接下来,我们将使用训练方法和损失函数来训练网络。为此,首先我们必须加载和预处理 iris 数据集,使其与 NN 的预期布局和数据格式相匹配。它可以在以下步骤的帮助下完成 -
    第 1 步- 导入numpyPandas包,如下所示 -
    
    
    import numpy as np
    
    import pandas as pd
    
    
    第 2 步- 接下来,使用read_csv函数将数据集加载到内存中 -
    
    
    df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’,
    
     ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)
    
    
    第 3 步- 现在,我们需要创建一个字典,将数据集中的标签与其对应的数字表示进行映射。
    
    
    label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}
    
    
    第 4 步- 现在,通过在 DataFrame 上使用iloc索引器,选择前四列,如下所示 -
    
    
    x = df_source.iloc[:, :4].values
    
    
    第 5 步- 接下来,我们需要选择物种列作为数据集的标签。可以按如下方式完成 -
    
    
    y = df_source[‘species’].values
    
    
    第 6 步- 现在,我们需要映射数据集中的标签,这可以通过使用label_mapping来完成。此外,使用one_hot编码将它们转换为 one-hot 编码数组。
    
    
    y = np.array([one_hot(label_mapping[v], 3) for v in y])
    
    
    第 7 步- 接下来,要将特征和映射标签与 CNTK 一起使用,我们需要将它们都转换为浮点数 -
    
    
    x= x.astype(np.float32)
    
    y= y.astype(np.float32)
    
    
    众所周知,标签作为字符串存储在数据集中,CNTK 无法处理这些字符串。这就是原因,它需要代表标签的 one-hot 编码向量。为此,我们可以定义一个函数one_hot如下 -
    
    
    def one_hot(index, length):
    
    result = np.zeros(length)
    
    result[index] = index
    
    return result
    
    
    现在,我们有了正确格式的 numpy 数组,借助以下步骤,我们可以使用它们来训练我们的模型 -
    第 8 步- 首先,我们需要导入损失函数来训练网络。让我们导入binary_cross_entropy_with_softmax作为损失函数 -
    
    
    from cntk.losses import binary_cross_entropy_with_softmax
    
    
    第 9 步- 要训练这个 NN,我们还需要从cntk.learners模块导入学习器。我们将按如下方式导入sgd学习器 -
    
    
    from cntk.learners import sgd
    
    
    第 10 步- 同时从cntk.logging模块导入ProgressPrinter 。
    
    
    from cntk.logging import ProgressPrinter
    
    progress_writer = ProgressPrinter(0)
    
    
    第 11 步- 接下来,为标签定义一个新的输入变量,如下所示 -
    
    
    labels = input_variable(3)
    
    
    第 12 步- 为了训练 NN 模型,接下来,我们需要使用binary_cross_entropy_with_softmax函数定义损失。还提供模型 z 和标签变量。
    
    
    loss = binary_cross_entropy_with_softmax (z, labels)
    
    
    第 13 步- 接下来,初始化sgd学习器,如下所示 -
    
    
    learner = sgd(z.parameters, 0.1)
    
    
    第 14 步- 最后,在损失函数上调用 train 方法。此外,为它提供输入数据、sgd学习器和progress_printer
    
    
    training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=
    
    [progress_writer],minibatch_size=16,max_epochs=5)
    
    

    完整的实现示例

    
    
    from cntk.layers import Dense, Sequential
    
    from cntk import input_variable, default_options
    
    from cntk.ops import sigmoid, log_softmax
    
    from cntk.losses import binary_cross_entropy
    
    model = Sequential([
    
    Dense(4, activation=sigmoid),
    
    Dense(3, activation=log_softmax)
    
    ])
    
    features = input_variable(4)
    
    z = model(features)
    
    import numpy as np
    
    import pandas as pd
    
    df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’, ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)
    
    label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}
    
    x = df_source.iloc[:, :4].values
    
    y = df_source[‘species’].values
    
    y = np.array([one_hot(label_mapping[v], 3) for v in y])
    
    x= x.astype(np.float32)
    
    y= y.astype(np.float32)
    
    def one_hot(index, length):
    
    result = np.zeros(length)
    
    result[index] = index
    
    return result
    
    from cntk.losses import binary_cross_entropy_with_softmax
    
    from cntk.learners import sgd
    
    from cntk.logging import ProgressPrinter
    
    progress_writer = ProgressPrinter(0)
    
    labels = input_variable(3)
    
    loss = binary_cross_entropy_with_softmax (z, labels)
    
    learner = sgd(z.parameters, 0.1)
    
    training_summary=loss.train((x,y),parameter_learners=[learner],callbacks=[progress_writer],minibatch_size=16,max_epochs=5)
    
    

    输出

    
    
    Build info:
    
         Built time: *** ** **** 21:40:10
    
         Last modified date: *** *** ** 21:08:46 2019
    
         Build type: Release
    
         Build target: CPU-only
    
         With ASGD: yes
    
         Math lib: mkl
    
         Build Branch: HEAD
    
         Build SHA1:ae9c9c7c5f9e6072cc9c94c254f816dbdc1c5be6 (modified)
    
         MPI distribution: Microsoft MPI
    
         MPI version: 7.0.12437.6
    
    -------------------------------------------------------------------
    
    average    since    average   since   examples
    
    loss        last     metric   last
    
    ------------------------------------------------------
    
    Learning rate per minibatch: 0.1
    
    1.1         1.1        0       0      16
    
    0.835     0.704        0       0      32
    
    1.993      1.11        0       0      48
    
    1.14       1.14        0       0     112
    
    [………]
    
    
  • 使用大型数据集进行训练

    在上一节中,我们使用 Numpy 和 pandas 处理了小型内存数据集,但并非所有数据集都这么小。特别是包含图像、视频、声音样本的数据集很大。MinibatchSource是一个组件,可以分块加载数据,由 CNTK 提供,用于处理如此大的数据集。MinibatchSource组件的一些特性如下:
    • MinibatchSource可以通过自动随机化从数据源读取的样本来防止 NN 过拟合。
    • 它具有内置的转换管道,可用于扩充数据。
    • 它将数据加载到与训练过程分开的后台线程上。
    在以下部分中,我们将探讨如何使用带有内存不足数据的小批量源来处理大型数据集。我们还将探索如何使用它来训练 NN。

    创建 MinibatchSource 实例

    在上一节中,我们使用了鸢尾花示例,并使用 Pandas DataFrames 处理了小型内存数据集。在这里,我们将使用MinibatchSource替换使用来自 pandas DF 的数据的代码。首先,我们需要借助以下步骤创建MinibatchSource的实例 -

    实现示例

    第 1 步- 首先,从cntk.io模块导入 minibatchsource 的组件,如下所示 -
    
    
    from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer,
    
     INFINITY_REPEAT
    
    
    第 2 步- 现在,通过使用StreamDef类,为标签创建一个流定义。
    
    
    labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
    
    
    Step 3 - 接下来,创建从输入文件中读取特征文件,创建另一个StreamDef实例,如下所示。
    
    
    feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
    
    
    第 4 步- 现在,我们需要提供iris.ctf文件作为输入并初始化解串器,如下所示 -
    
    
    deserializer = CTFDeserializer(‘iris.ctf’, StreamDefs(labels=
    
    label_stream, features=features_stream)
    
    
    第 5 步- 最后,我们需要使用反序列化器创建minisourceBatch的实例,如下所示 -
    
    
    Minibatch_source = MinibatchSource(deserializer, randomize=True)
    
    

    创建 MinibatchSource 实例 - 完整的实现示例

    
    
    from cntk.io import StreamDef, StreamDefs, MinibatchSource, CTFDeserializer, INFINITY_REPEAT
    
    labels_stream = StreamDef(field=’labels’, shape=3, is_sparse=False)
    
    feature_stream = StreamDef(field=’features’, shape=4, is_sparse=False)
    
    deserializer = CTFDeserializer(‘iris.ctf’, StreamDefs(labels=label_stream, features=features_stream)
    
    Minibatch_source = MinibatchSource(deserializer, randomize=True)
    
    

    创建 MCTF 文件

    正如您在上面看到的,我们正在从“iris.ctf”文件中获取数据。它具有称为 CNTK 文本格式 (CTF) 的文件格式。必须创建一个 CTF 文件来获取我们上面创建的MinibatchSource实例的数据。让我们看看如何创建 CTF 文件。

    实现示例

    第 1 步- 首先,我们需要导入 pandas 和 numpy 包,如下所示 -
    
    
    import pandas as pd
    
    import numpy as np
    
    
    第 2 步- 接下来,我们需要将数据文件(即 iris.csv)加载到内存中。然后,将其存储在df_source变量中。
    
    
    df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’, ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)
    
    
    Step 3 - 现在,通过使用iloc indexer 作为特征,获取前四列的内容。此外,使用来自物种列的数据如下 -
    
    
    features = df_source.iloc[: , :4].values
    
    labels = df_source[‘species’].values
    
    
    第 4 步- 接下来,我们需要创建标签名称与其数字表示之间的映射。可以通过如下创建label_mapping来完成 -
    
    
    label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}
    
    
    第 5 步- 现在,将标签转换为一组单热编码向量,如下所示 -
    
    
    labels = [one_hot(label_mapping[v], 3) for v in labels]
    
    
    现在,就像我们之前所做的那样,创建一个名为one_hot的实用函数来对标签进行编码。可以按如下方式完成 -
    
    
    def one_hot(index, length):
    
    result = np.zeros(length)
    
    result[index] = 1
    
    return result
    
    
    由于我们已经加载并预处理了数据,是时候将其以 CTF 文件格式存储在磁盘上。我们可以借助以下 Python 代码来做到这一点 -
    
    
    With open(‘iris.ctf’, ‘w’) as output_file:
    
    for index in range(0, feature.shape[0]):
    
    feature_values = ‘ ‘.join([str(x) for x in np.nditer(features[index])])
    
    label_values = ‘ ‘.join([str(x) for x in np.nditer(labels[index])])
    
    output_file.write(‘features {} | labels {} \n’.format(feature_values, label_values))
    
    

    创建 MCTF 文件 - 完整的实现示例

    
    
    import pandas as pd
    
    import numpy as np
    
    df_source = pd.read_csv(‘iris.csv’, names = [‘sepal_length’, ‘sepal_width’, ‘petal_length’, ‘petal_width’, ‘species’], index_col=False)
    
    features = df_source.iloc[: , :4].values
    
    labels = df_source[‘species’].values
    
    label_mapping = {‘Iris-Setosa’ : 0, ‘Iris-Versicolor’ : 1, ‘Iris-Virginica’ : 2}
    
    labels = [one_hot(label_mapping[v], 3) for v in labels]
    
    def one_hot(index, length):
    
    result = np.zeros(length)
    
    result[index] = 1
    
    return result
    
    With open(‘iris.ctf’, ‘w’) as output_file:
    
    for index in range(0, feature.shape[0]):
    
    feature_values = ‘ ‘.join([str(x) for x in np.nditer(features[index])])
    
    label_values = ‘ ‘.join([str(x) for x in np.nditer(labels[index])])
    
    output_file.write(‘features {} | labels {} \n’.format(feature_values, label_values))
    
    
  • 提供数据

    创建MinibatchSource实例后,我们需要对其进行训练。我们可以使用与处理小型内存数据集时相同的训练逻辑。在这里,我们将使用MinibatchSource实例作为损失函数训练方法的输入,如下所示 -

    实现示例

    步骤 1 - 为了记录培训课程的输出,首先从cntk.logging模块导入 ProgressPrinter,如下所示 -
    
    
    from cntk.logging import ProgressPrinter
    
    
    第 2 步- 接下来,要设置培训课程,请从cntk.train模块导入培训师training_session,如下所示 -
    
    
    from cntk.train import Trainer, 
    
    
    第 3 步- 现在,我们需要定义一些常量,如minibatch_sizesamples_per_epochnum_epochs如下 -
    
    
    minbatch_size = 16
    
    samples_per_epoch = 150
    
    num_epochs = 30
    
    
    第 4 步- 接下来,为了了解 CNTK 在训练期间如何读取数据,我们需要定义网络输入变量和小批量源中的流之间的映射。
    
    
    input_map = {
    
         features: minibatch.source.streams.features,
    
         labels: minibatch.source.streams.features
    
    }
    
    
    第 5 步- 接下来,要记录训练过程的输出,使用新的ProgressPrinter实例初始化progress_printer变量,如下所示 -
    
    
    progress_writer = ProgressPrinter(0)
    
    
    第 6 步- 最后,我们需要在损失上调用 train 方法,如下所示 -
    
    
    train_history = loss.train(minibatch_source,
    
    parameter_learners=[learner],
    
      model_inputs_to_streams=input_map,
    
    callbacks=[progress_writer],
    
    epoch_size=samples_per_epoch,
    
    max_epochs=num_epochs)
    
    

    提供数据 - 完整的实施示例

    
    
    from cntk.logging import ProgressPrinter
    
    from cntk.train import Trainer, training_session
    
    minbatch_size = 16
    
    samples_per_epoch = 150
    
    num_epochs = 30
    
    input_map = {
    
       features: minibatch.source.streams.features,
    
       labels: minibatch.source.streams.features
    
    }
    
    progress_writer = ProgressPrinter(0)
    
    train_history = loss.train(minibatch_source,
    
    parameter_learners=[learner],
    
    model_inputs_to_streams=input_map,
    
    callbacks=[progress_writer],
    
    epoch_size=samples_per_epoch,
    
    max_epochs=num_epochs)
    
    

    输出

    
    
    -------------------------------------------------------------------
    
    average   since   average   since  examples
    
    loss      last     metric   last
    
    ------------------------------------------------------
    
    Learning rate per minibatch: 0.1
    
    1.21      1.21      0        0       32
    
    1.15      0.12      0        0       96
    
    [………]