创建联邦学习作业

本页介绍了如何使用设备端个性化提供的联邦学习 API,通过联邦平均学习过程和固定的正态噪声训练模型。

准备工作

在开始之前,请在测试设备上完成以下步骤:

  1. 确保已安装 OnDevicePersonalization 模块。该模块已于 2024 年 4 月以自动更新形式推出。

    # List the modules installed on the device
    adb shell pm list packages --apex-only --show-versioncode
    

    确保以下模块列出了版本代码为 341717000 或更高版本:

    package:com.google.android.ondevicepersonalization versionCode:341717000

    如果该模块未列出,请依次转到设置 > 安全和隐私 > 更新 > Google Play 系统更新,确保您的设备是最新版本。根据需要选择更新

  2. 启用所有与联邦学习相关的新功能。

    # Enable On-Device Personalization apk.
    adb shell device_config put on_device_personalization global_kill_switch false
    # Enable On-Device Personalization APIs.
    adb shell device_config put on_device_personalization enable_ondevicepersonalization_apis true
    # Enable On-Device Personalization overriding.
    adb shell device_config put on_device_personalization enable_personalization_status_override true
    adb shell device_config put on_device_personalization personalization_status_override_value true
    # Enable Federated Compute apk.
    adb shell device_config put on_device_personalization federated_compute_kill_switch false
    

创建联合学习任务

联合学习客户端-服务器拓扑,包含 8 个突出显示的步骤。
联邦学习客户端-服务器拓扑图,其中突出显示了八个步骤。

图中的数字编号在以下八个步骤中进行了详细说明。

配置联邦计算服务器

联合学习是一种映射缩减技术,在联合计算服务器(缩减器)和一组客户端(映射器)上运行。联合计算服务器维护每个联合学习任务的运行元数据和模型信息。概括来讲:

  • 联邦学习开发者创建新任务,并将任务运行元数据和模型信息上传到服务器。
  • 当联邦计算客户端向服务器发起新的任务分配请求时,服务器会检查任务的资格条件,并返回符合条件的任务信息。
  • 联邦计算客户端完成本地计算后,会将这些计算结果发送到服务器。然后,服务器对这些计算结果执行聚合和噪声处理,并将结果应用到最终模型。

如需详细了解这些概念,请查看:

ODP 使用增强型版本的联邦学习,也就是先将校准(集中)的噪声应用于汇总,然后再应用到模型。噪声的规模可确保汇总数据能保持差分隐私。

第 1 步:创建联合计算服务器

按照联邦计算项目中的说明设置您自己的联邦计算服务器。

第 2 步:准备已保存的 FunctionalModel

准备已保存的 'FunctionalModel' 文件。您可以使用 'functional_model_from_keras''Model' 转换为 'FunctionalModel',并使用 'save_functions_model' 将此 'FunctionalModel' 序列化为 'SavedModel'

functional_model = tff.learning.models.functional_model_from_keras(keras_model=model)
tff.learning.models.save_functional_model(functional_model, saved_model_path)

第 3 步:创建联合计算服务器配置

准备一个 fcp_server_config.json,其中包含政策、联邦学习设置和差分隐私设置。示例:

  # Identifies the set of client devices that will participate.
  population_name: "my_new_population"
  # Options you can choose:
  # * TRAINING_ONLY: Only one training task will be generated under this
  #                  population.
  # * TRAINING_AND_EVAL: One training task and one evaluation task will be
  #                      generated under this population.
  # * EVAL_ONLY: Only one evaluation task will be generated under this
  #              population.
  mode: TRAINING_AND_EVAL
  policies {
    # Policy for sampling on-device examples. It is checked every time a
    # device attempts to start a new training.
    min_separation_policy {
      # The minimum number of rounds before the same client participated.
      minimum_separation: 3
    }
    # Policy for releasing training results to developers. It is checked
    # when uploading a new task to the Federated Compute Server.
    model_release_policy {
      # Server stops training when number of training rounds reaches this
      # number.
      num_max_training_rounds: 1000
    }
  }
  # Federated learning setups. They are applied inside Task Builder.
  federated_learning {
    learning_process {
      # Use FED_AVG to build federated learning process. Options you can
      # choose:
      # * FED_AVG: Federated Averaging algorithm
      #            (https://arxiv.org/abs/2003.00295)
      # * FED_SDG: Federated SGD algorithm
      #            (https://arxiv.org/abs/1602.05629)
      type: FED_AVG
      # Optimizer used at client side training. Options you can choose:
      # * ADAM
      # * SGD
      client_optimizer: SGD
      # Learning rate used at client side training.
      client_learning_rate: 0.01
      # Optimizer used at server side training. Options you can choose:
      # * ADAM
      # * SGD
      server_optimizer: ADAM
      # Learning rate used at server side training.
      sever_learning_rate: 1
      runtime_config {
        # Number of participating devices for each round of training.
        report_goal: 2000
      }
      # List of metrics to be evaluated by the model during training and
      # evaluation. Federated Compute Server provides a list of allowed
      # metrics.
      metrics {
        name: "auc-roc"
      }
      metrics {
        name: "binary_accuracy"
      }
    }
    # Whether or not to generate a corresponding evaluation task under the same
    # population. If this field isn't set, only one training task is
    # generated under this population.
    evaluation {
      # The task id under the same population of the source training task that
      # this evaluation task evaluates.
      source_training_task_id: 1
      # Decides how checkpoints from the training task are chosen for
      # evaluation.
      # * every_k_round: the evaluation task randomly picks one checkpoint
      #                  from the past k rounds of training task checkpoints.
      # * every_k_hour: the evaluation task randomly picks one checkpoint
      #                 from the past k hours of training task checkpoints.
      checkpoint_selector: "every_1_round"
      # The traffic of this evaluation task in this population.
      evaluation_traffic: 0.1
      # Number of participating devices for each round of evaluation.
      report_goal: 200
    }
  }
  # Differential Privacy setups. They are applied inside the Task Builder.
  differential_privacy {
    # The DP aggregation algorithm you want to use. Options you can choose:
    # * FIXED_GAUSSIAN: Federated Learning DP-SGD with fixed clipping norm
    #                   described in "Learning Differentially Private Recurrent
    #                   Language Models" (https://arxiv.org/abs/1710.06963).
    # * ADAPTIVE_GAUSSIAN: Federated Learning DP-SGD with quantile-based clip
    #                      norm estimation described in "Differentially Private
    #                      Learning with Adaptive Clipping"
    #                      (https://arxiv.org/abs/1905.03871).
    # * TREE: DP-FTRL algorithm described in "Practical and Private (Deep)
    #         Learning without Sampling or Shuffling"
    #         (https://arxiv.org/abs/2103.00039).
    # * ADADPTIVE_TREE: DP-FTRL with adaptive clipping norm descirbed in
    #                  "Differentially Private Learning with Adaptive Clipping"
    #                  (https://arxiv.org/abs/1905.03871).
    type: FIXED_GAUSSIAN
    # Noise multiplier for the Gaussian noise.
    noise_multiplier: 0.1
    #   The value of the clipping norm.
    clip_norm: 0.1
  }

第 4 步:将 ZIP 配置提交到联邦计算服务器。

将 zip 文件和 fcp_server_config.json 提交到联邦计算服务器。

task_builder_client --task_builder_server='http://{federated_compute_server_endpoint}' --saved_model='saved_model' --task_config='fcp_server_config.json'

联合计算服务器端点是您在第 1 步中设置的服务器。

LiteRT 内置运算符库仅支持少数 TensorFlow 运算符(部分 TensorFlow 运算符)。支持的运算符集可能会因 OnDevicePersonalization 模块的不同版本而异。为了确保兼容性,系统会在任务创建期间在任务构建器中执行操作员验证流程。

  • 任务元数据中将包含支持的最低 OnDevicePersonalization 模块版本。您可以在任务制作工具的信息消息中找到此信息。

    I1023 22:16:53.058027 139653371516736 task_builder_client.py:109] Success! Tasks are built, and artifacts are uploaded to the cloud.
    I1023 22:16:53.058399 139653371516736 task_builder_client.py:112] applied_algorithms {
      learning_algo: FED_AVG
      client_optimizer: SGD
      server_optimizer: SGD
      dp_aggregator: FIXED_GAUSSIAN
    }
    metric_results {
      accepted_metrics: "binary_accuracy, binary_crossentropy, recall, precision, auc-roc, auc-pr"
    }
    dp_hyperparameters {
      dp_delta: 0.000001
      dp_epsilon: 6.4
      noise_multiplier: 1.0
      dp_clip_norm: 1.0
      num_training_rounds: 10000
    }
    
    I1023 22:16:53.058594 139653371516736 task_builder_client.py:113] training_task {
      min_client_version: "341912000"
    }
    eval_task {
      min_client_version: "341812000"
    }
    

    联邦计算服务器将向所有搭载版本高于 341812000 的 OnDevicePersonalization 模块的设备分配此任务。

  • 如果您的模型包含任何 OnDevicePersonalization 模块不支持的操作,系统会在任务创建期间生成错误消息。

    common.TaskBuilderException: Cannot build the ClientOnlyPlan: Please contact Google to register these ops: {'L2Loss': 'L2LossOp<CPUDevice, float>'}
    . Stop building remaining artifacts.
    
  • 您可以在 GitHub 上找到受支持的 flex 运算的详细列表。

创建 Android 联邦计算 APK

如需创建 Android 联邦计算 APK,您需要在 AndroidManifest.xml 中指定联邦计算服务器网址端点,联邦计算客户端会连接到该端点。

第 5 步:指定联邦计算服务器网址端点

AndroidManifest.xml 中指定您的联邦计算服务器网址端点(您在第 1 步中设置的),联邦计算客户端将连接到该端点。

<!-- Contents of AndroidManifest.xml -->
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
          package="com.example.odpsample" >
    <application android:label="OdpSample">
        <!-- XML resource that contains other ODP settings. -->
        <property android:name="android.ondevicepersonalization.ON_DEVICE_PERSONALIZATION_CONFIG"
                  android:resource="@xml/OdpSettings"></property>
        <!-- The service that ODP will bind to. -->
        <service android:name="com.example.odpsample.SampleService"
                android:exported="true" android:isolatedProcess="true" />
    </application>
</manifest>

<property> 标记中指定的 XML 资源文件还必须在 <service> 标记中声明服务类,并指定联邦计算客户端将连接到的联邦计算服务器网址端点:

<!-- Contents of res/xml/OdpSettings.xml -->
<on-device-personalization>
   <!-- Name of the service subclass -->
   <service name="com.example.odpsample.SampleService">
     <!-- If you want to use federated compute feature to train a model,
          specify this tag. -->
     <federated-compute-settings url="https://fcpserver.com/" />
   </service>
</on-device-personalization>

第 6 步:实现 IsolatedWorker#onTrainingExample API

实现设备端个性化公共 API IsolatedWorker#onTrainingExample 以生成训练数据。

IsolatedProcess 中运行的代码无法直接访问网络、本地磁盘或设备上运行的其他服务;不过,可以使用以下 API:

示例:

@Override public void onTrainingExample(
            @NonNull TrainingExampleInput input,
            @NonNull Consumer<TrainingExampleOutput> consumer) {
    // Check if the incoming training task is the task we want.
    if (input.getPopulationName() == "my_new_population") {
        TrainingExampleOutput result = new TrainingExampleOutput.Builder():
        RequestLogRecord record = this.getLogReader().getRequestLogRecord(1);
        int count = 1;
        // Iterate logging event table.
        for (ContentValues contentValues: record.rows()) {
            Features features = Features.newBuilder()
                // Retrieve carrier from user info.
                .putFeature("carrier", buildFeature(mUserData.getCarrier()))
                // Retrieve features from logging info.
                .putFeature("int_feature_1",
                    buildFeature(contentValues.get("int_feature_1")
            result.addTrainingExample(
                    Example.newBuilder()
                        .setFeatures(features).build().toByteArray())
                .addResumptionToken(
                    String.format("token%d", count).getBytes()))
                .build();
            count++;
        }
        consumer.accept(result.build());
    }
}

第 7 步:安排周期性训练任务。

设备端个性化提供了 FederatedComputeScheduler,供开发者调度或取消联邦计算作业。通过 IsolatedWorker 调用它有多种方式,可以按计划调用,也可以在异步下载完成时调用。下面给出了这两种方法的示例。

  • 基于时间表的选项。致电FederatedComputeScheduler#schedule(位于IsolatedWorker#onExecute)。

    @Override public void onExecute(
                @NonNull ExecuteInput input,
                @NonNull Consumer<ExecuteOutput> consumer
        ) {
        if (input != null && input.getAppParams() != null
            && input.getAppParams().getString("schedule_training") != null) {
            if (input.getAppParams().getString("schedule_training").isEmpty()) {
                consumer.accept(null);
                return;
            }
            TrainingInterval interval = new TrainingInterval.Builder()
                .setMinimumInterval(Duration.ofSeconds(10))
                .setSchedulingMode(2)
                .build();
            FederatedComputeScheduler.Params params = new FederatedComputeScheduler
                .Params(interval);
            FederatedComputeInput fcInput = new FederatedComputeInput.Builder()
                .setPopulationName(
                    input.getAppParams().getString("schedule_training")).build();
            mFCScheduler.schedule(params, fcInput);
    
            ExecuteOutput result = new ExecuteOutput.Builder().build();
            consumer.accept(result);
        }
    }
    
  • 下载完成选项。如果调度训练任务依赖于任何异步数据或进程,请在 IsolatedWorker#onDownloadCompleted 中调用 FederatedComputeScheduler#schedule

验证

以下步骤介绍了如何验证联邦学习任务是否正常运行。

第 8 步:验证联邦学习任务是否正常运行。

每一轮服务器端汇总时都会生成一个新的模型检查点和一个新的指标文件。

这些指标位于 JSON 格式的键值对文件中。该文件由您在第 3 步中定义的 Metrics 列表生成。下面给出了一个具有代表性的指标 JSON 文件示例:

{"server/client_work/train/binary_accuracy":0.5384615659713745, "server/client_work/train/binary_crossentropy":0.694046676158905, "server/client_work/train/recall":0.20000000298023224, "server/client_work/train/precision":0.3333333432674408, "server/client_work/train/auc-roc":0.3500000238418579, "server/client_work/train/auc-pr":0.44386863708496094, "server/finalizer/update_non_finite":0.0}

您可以使用类似以下脚本的内容来获取模型指标并监控训练性能:

import collections
import json
import matplotlib.pyplot as plt
from google.cloud import storage

# The population_name you set in fcp_server_config.json in Step 3.
POPULATION_NAME = 'my_new_population'
# The Google Cloud storage you set in Step 1.
GCS_BUCKET_NAME = 'fcp-gcs'
NUM_TRAINING_ROUND = 1000

storage_client = storage.Client()
bucket = storage_client.bucket(GCS_BUCKET_NAME)

metrics = collections.defaultdict(list)
for i in range(NUM_TRAINING_ROUND):
    blob = bucket.blob('{}/{}/1/{}/s/0/metrics'.format(GCS_BUCKET_NAME, POPULATION_NAME, i+1))
    with blob.open("r") as f:
                     metric = json.loads(f.read())
                    for metric_name in metric.keys():
                             metrics[metric_name].append(metric[metric_name])

for metric_name in metrics:
         print(metric_name)
         plt.plot(metrics[metric_name])
         plt.show()
绘制 auc-roc 指标时的效果示例图表。

请注意,在上例图表中:

  • x 轴表示训练次数。
  • y 轴是每轮 auc-roc 的值。

在设备端个性化中训练图片分类模型

在本教程中,EMNIST 数据集用于演示如何在 ODP 上运行联合学习任务。

第 1 步:创建 tff.learning.models.FunctionalModel

def get_image_classification_input_spec():
  return (
      tf.TensorSpec([None, 28, 28, 1], tf.float32),
      tf.TensorSpec([None, 1], tf.int64),
  )

def create_and_save_image_classification_functional_model(
    model_path: str,
) -> None:
  keras_model =  emnist_models.create_original_fedavg_cnn_model(
      only_digits=True
  )
  functional_model = tff.learning.models.functional_model_from_keras(
      keras_model=keras_model,
      input_spec=get_image_classification_input_spec(),
      loss_fn=tf.keras.losses.SparseCategoricalCrossentropy(),
  )
  tff.learning.models.save_functional_model(functional_model, model_path)
  • 您可以在 emnist_models 中找到 emnist keras 模型的详细信息。
  • TfLite 尚不能很好地支持 tf.sparse.SparseTensortf.RaggedTensor。构建模型时,尽可能多地使用 tf.Tensor
  • 构建学习流程时,ODP Task Builder 会覆盖所有指标,无需指定任何指标。该主题将在第 2 步:创建任务构建器配置
  • 支持两种类型的模型输入:

    • 类型 1. 元组(特征张量、标签张量)。

      • 创建模型时,input_spec 如下所示:
      def get_input_spec():
        return (
            tf.TensorSpec([None, 28, 28, 1], tf.float32),
            tf.TensorSpec([None, 1], tf.int64),
        )
      
      return tf.train.Example(
          features=tf.train.Features(
              feature={
                  'x': tf.train.Feature(
                      float_list=tf.train.FloatList(value=[1.0] * 784)
                  ),
                  'y': tf.train.Feature(
                      int64_list=tf.train.Int64List(
                          value=[1]
                      )
                  ),
              }
          )
      ).SerializeToString()
      
    • 类型 2. Tuple(Dict[feature_name, feature_tensor], label_tensor)

      • 创建模型时,input_spec 如下所示:
      def get_input_spec() -> (
          Tuple[collections.OrderedDict[str, tf.TensorSpec], tf.TensorSpec]
      ):
        return (
            collections.OrderedDict(
                [('feature-1', tf.TensorSpec([None, 1], tf.float32)),
                ('feature-2', tf.TensorSpec([None, 1], tf.float32))]
            ),
            tf.TensorSpec([None, 1], tf.int64),
        )
      
      return tf.train.Example(
          features=tf.train.Features(
              feature={
                  'feature-1': tf.train.Feature(
                      float_list=tf.train.FloatList(value=[1.0])
                  ),
                  'feature-2': tf.train.Feature(
                      float_list=tf.train.FloatList(value=[2.0])
                  ),
                  'my_label': tf.train.Feature(
                      int64_list=tf.train.Int64List(
                          value=[1]
                      )
                  ),
              }
          )
      ).SerializeToString()
      
      • 别忘了在任务构建器配置中注册 label_name
      mode: TRAINING_AND_EVAL  # Task execution mode
      population_name: "my_example_model"
      label_name: "my_label"
      
  • ODP 在构建学习过程时自动处理 DP。因此,在创建函数式模型时无需添加任何噪声。

  • 此已保存的函数模型的输出应与 GitHub 代码库中的示例类似。

第 2 步:创建任务构建器配置

您可以在我们的 GitHub 代码库中找到任务构建器配置示例

  • 训练和评估指标

    鉴于指标可能会泄露用户数据,任务构建器将列出学习过程可以生成和发布的指标。您可以在我们的 GitHub 代码库中找到完整列表

    以下是创建新的任务构建器配置时的指标列表示例:

    federated_learning {
      learning_process {
        metrics {
          name: "binary_accuracy"
        }
        metrics {
          name: "binary_crossentropy"
        }
        metrics {
          name: "recall"
        }
        metrics {
          name: "precision"
        }
        metrics {
          name: "auc-roc"
        }
        metrics {
          name: "auc-pr"
        }
      }
    }
    

如果您感兴趣的指标不在当前列表中,请与我们联系。

  • DP 配置

    您需要指定一些与 DP 相关的配置:

    policies {
      min_separation_policy {
        minimum_separation: 1
      }
      model_release_policy {
        num_max_training_rounds: 1000
        dp_target_epsilon: 10
        dp_delta: 0.000001
      }
    }
    differential_privacy {
      type: FIXED_GAUSSIAN
      clip_norm: 0.1
      noise_multiplier: 0.1
    }
    

第 3 步:将保存的模型和任务构建器配置上传到任何开发者的云端存储空间

上传任务构建器配置时,请务必更新 artifact_building 字段。

第 4 步:(可选)在不创建新任务的情况下构建测试工件

cd ${odp_fcp_github_repo}/python
bazel run //python/taskbuilder:task_builder_client -- --saved_model=${path_of_cloud_storage}/mnist_model/ --task_config=${path_of_cloud_storage}/mnist_cnn_task_config_build_artifact_only.pbtxt --build_artifact_only=true --task_builder_server=${task_builder_server_endpoint}

示例模型通过 Flex 操作检查和 dp 检查进行验证;您可以添加 skip_flex_ops_checkskip_dp_check,以便在验证期间绕过(由于一些缺少的 Flex 操作,此模型无法部署到当前版本的 ODP 客户端)。

cd ${odp_fcp_github_repo}/python
bazel run //python/taskbuilder:task_builder_client -- --saved_model=${path_of_cloud_storage}/mnist_model/ --task_config=${path_of_cloud_storage}/mnist_cnn_task_config_build_artifact_only.pbtxt --build_artifact_only=true --task_builder_server=${task_builder_server_endpoint} --skip_flex_ops_check=True --skip_dp_check=True
  • flex_ops_check:TensorFlow Lite 内置运算符库仅支持少数的 TensorFlow 运算符(TensorFlow Lite 与 TensorFlow 运算符的兼容性)。所有不兼容的 TensorFlow 操作都需要使用 flex 委托 (Android.bp) 进行安装。如果模型包含不受支持的运算,请与我们联系以进行注册:

    Cannot build the ClientOnlyPlan: Please contact Google to register these ops: {...}
    
  • 调试任务构建器的最佳方法是在本地启动一个任务构建器:

    # Starts a server at localhost:5000
    bazel run //python/taskbuilder:task_builder
    # Links to a server at localhost:5000 by removing task_builder_server flag
    bazel run //python/taskbuilder:task_builder_client -- --saved_model=${path_of_cloud_storage}/mnist_model/ --task_config=${path_of_cloud_storage}/mnist_cnn_task_config_build_artifact_only.pbtxt --build_artifact_only=true --skip_flex_ops_check=True --skip_dp_check=True
    

您可以在配置中指定的云端存储空间中找到生成的工件。它应该与 GitHub 代码库中的示例类似。

第 5 步:构建工件,并在 FCP 服务器上创建一对新的训练和评估任务。

移除 build_artifact_only 标志,系统会将构建的工件上传到 FCP 服务器。您应检查是否成功创建了一对训练和评估任务

cd ${odp_fcp_github_repo}/python
bazel run //python/taskbuilder:task_builder_client -- --saved_model=${path_of_cloud_storage}/mnist_model/ --task_config=${path_of_cloud_storage}/mnist_cnn_task_config.pbtxt --task_builder_server=${task_builder_server_endpoint}

第 6 步:准备好 FCP 客户端

第 7 步:监控

  • 服务器指标

    在我们的 GitHub 代码库中查找设置说明

每分钟作业数图表。
迭代处理时间图。
一段时间内的迭代次数图表。
  • 模型指标
图表:演示不同运行作业的指标比较。

您可以在一个图表中比较不同运行的各项指标。例如:

  • 紫色线条的 noise_multiplier 为 0.1
  • 粉色线条表示 noise_multipiler 0.3