likes
comments
collection
share

FATE联邦学习框架java实践(二)

作者站长头像
站长
· 阅读数 3

项目源码FateTest - 码云 - 开源中国 (gitee.com)

现在对于一个完整Fate实践的功能流程,还差数据更新上传执行任务。因此本篇主要实现数据更新功能和上传任务的功能。

1. Fate的数据更新功能

Fate并没有提供表中数据的更新功能,因此想要实现数据更新,只能是下载数据->程序更新数据->数据上传

下载数据文件
数据追加写入文件
文件更新后上传Fate

下载数据后解压为csv文件的方法以及数据上传到Fate的方法已经实现,还需要实现的是数据追加写入文件的方法。

  1. 文件根路径已经固定在Apollo配置,如有需要自行配置。
  2. 函数基本逻辑为将数据追加写入到指定命名的csv文件中,如果文件不存在或直接生成csv文件后写入
  3. 生成csv文件插入了表头(id),如有需要自行配置。

函数路径: FATE联邦学习框架java实践(二)

为了可靠性,数据更新方法会校验三个过程中的错误码,如果有一个流程执行失败,那么就会返回其失败的错误码。

数据更新方法源码路径如下:

FATE联邦学习框架java实践(二)

2. 上传Fate任务功能

根据官网API文档:API - FATE Flow (federatedai.github.io),上传任务的接口实现起来非常简单,如下图。

但是其请求参数的设置却非常复杂。

FATE联邦学习框架java实践(二)

接下来,我们需要做就是在JAVA中实现对参数的配置。如图,任务参数其实是一个嵌套的JSON,所以,要更改JSON中对应的参数并不容易。

FATE联邦学习框架java实践(二)

一个好消息,因为我自己这里是固定一个隐私求交集的任务,所以对于任务模块和partID这些参数都是固定死的,我需要更改的只有求交双方的表名。

经过技术调研,我们最后选择用jackSon对JSON解析后进行更改的方式进行任务参数配置。

FATE联邦学习框架java实践(二)

3. 任务状态查询功能

事实上,上传数据在Fate中也被认为是一个任务,所以在新的数据覆盖上传之前,我们不能去提交隐私求交任务。

因此,我们需要知道任务是否完成。

虽然,Fate提交任务并非任务成功后才返回值,但是Fate提供了任务状态查询接口让我们能够知道任务是否完成。

如图,接口实现如下:

FATE联邦学习框架java实践(二)

通过轮训这个接口,我们就能够知道任务是否已完成。对于任务查询接口的返回值官网没有明确说明,好在我已经一个一个都给你们试出来了。

  1. 任务处在running、waiting、ready的状态,设置了等待时长,因为任务执行时间过长(一般两分钟),高频率的轮询请求不太合理。
  2. 只有任务明确了成功(status为success)或者失败的状态才会结束轮询。
  3. 目前我还没见过其他的非失败状态,所以增加了打点,防止错漏。
while(!allUpdateJobFlag){
    // 跟踪任务进行状态
    boolean updateJobFlag = true;
    for (int i = 0; i < tmpIdList.size(); i++){
        String status = fateService.jobQuery(tmpId);
        if (StringUtils.equalsIgnoreCase(status, "success")){
            // 进行隐私求交任务
        }else if(StringUtils.equalsIgnoreCase(status, "running")){
            Thread.sleep(1000);
            updateJobFlag = false;
        }else if(StringUtils.equalsIgnoreCase(status, "waiting")){
            Thread.sleep(1000);
            updateJobFlag = false;
        }else if(StringUtils.equalsIgnoreCase(status, "ready")){
            Thread.sleep(1000);
            updateJobFlag = false;
        }else{ // 任务失败直接略过
            Cat.logEvent("status",status);
        }
    }
    allUpdateJobFlag = updateJobFlag;
}