标题 | 企业内部分布式机器学习系统设计与实现 |
范文 | 秦子实 摘要:随着机器学习算法在工业领域的大规模应用,企业内部网络急需部署一个可以用于机器学习算法验证、调试以及应用的系统。该系统应具备足够的算力,同时支持研发人员并发运行多个计算任务,并在计算任务结束后返回结果。该文将使用消息队列和分布式进程调度框架设计一个满足企业内部需求的分布式机器学习平台。 关键词:分布式系统;机器学习;消息队列 中图分类号:TP393 文献标识码:A 文章编号:1009-3044(2018)01-0201-02 1 概述 随着人工智能应用的日益成熟,越来越多的工业领域使用了机器学习算法解决实际问题。在企业的日常运作中会产生大量的业务数据,研究人员使用机器学习算法从这些业务数据中挖掘出有更价值的信息。因此,企业内部中需要一个能够支持研发人员并发运行机器学习算法的平台,本文将使用Python作为主要编程语言,利用Celery分布式进程调度系统分配计算任务、利用Redis临时存储计算作业临时消息及计算结果、利用TensorFlow/Keras执行机器学习算法,最终通过Flask将平台发布为网页应用,向研发人员提供机器学习算法验证、调试等服务。 2 Celery技术简介 Celery是一个架构简洁、配置灵活且具有高可用性的分布式任务队列框架,它擅长通过分布式系统并行处理大量作业,并提供维护此类系统的必要工具。Celery专注于实时处理任务队列,同时也支持任务调度。 Celery的架构由三部分组成,消息队列系统(message broker)、任务执行单元(celery worker)和作业结果储存(task store backend)。Celery本身虽然不提供消息服务,但是兼容大部分常见的消息队列(如Redis、RabbitMQ等)。Celery worker可以運行在分布式系统的节点上。作业结果可以以内存数据库(如Redis、memcached)、数据库(MongoDB)、对象关系映射(SQLAlchemy、DjangoORM)等多种方式进行数据持久化。 此外,Celery还可以与gevent等框架集成,以支持大规模并发特性。同时也支持诸如pickle、json、yaml等多种序列化格式。 3 系统结构设计 分布式机器学习系统通过Celery进行作业调度,Celery接受前端的任务(例如通过Flask接受用户提交的任务)后,在多台运行Celery进程的主机上分发作业并执行。由于机器学习算法普遍具有运算量大、作业时间长等特点,所以各执行中的作业应定期将作业进度回写至消息队列或持久化系统中,以在前端(如用户在Flask上的提交作业的页面)实时更新作业进度。在作业结束时Celery worker将结果写入消息队列或持久化系统,前端可以从中读取结果。 3 系统结构设计 本文以Flask项目为例组织代码,使用Redis作为消息队列,使用SQLAlchemy进行数据持久化,使用TensorFlow/Keras作为机器学习计算平台。 3.1 Flask项目结构 Flask项目的源码结构如下: [- config.py - run.py - app/ |- app/__init__.py |- app/models.py |- app/views.py - preprocess/ |- preprocess/__init__.py |- preprocess/models.py |- preprocess/views.py - analysis/ |- analysis/__init__.py |- analysis/models.py |- analysis/views.py |- analysis/datamodel.py |- analysis/tasks.py ] 一个典型的分布式机器学习项目至少应包含项目配置(config.py)、自动化脚本(run.py)、用户及权限(app模块)、数据预处理(preprocess模块)、模型训练(analysis模块)这五部分: 1) 项目配置包括Flask、SQLAlchemy、Redis、Celery、TensorFlow/Keras等模块的全局配置,以及项目相关常量(如模型保存路径、上传类型限制等); 2) 自动化脚本包括项目初始化、部署、启动、停止、升级、调试等命令行脚本; 3) 用户及权限是B/S系统的必备功能,包括用户-角色的多对多映射、用户-角色的增删改等功能; 4) 数据预处理模块负责对用户上传的数据进行结构化处理,并将结构化数据写入内存数据库等持久化系统,以供之后的机器学习算法或前端可视化模块快速调用; 5) 模型训练模块应具备模型建立(模型结构及模型规模参数可配置)、训练配置(批训练集及迭代次数等参数可调整)、训练作业监控(模型误差及迭代次数实时消息回写)、训练结果保存(模型及模型运算结果保存)等机器学习系统常见功能。 3.2 Redis消息队列 Redis系统部署在消息服务器上,作为消息中间人(Message Broker)的角色,配合Celery在分布式系统中调度并发的机器学习作业。 此外,Redis本身具备键值对内存数据库的功能,利用Redis极高的并发读写速度,可以用于暂存Celery作业的中间状态,以供前端实时监控机器学习计算作业的进度。 Redis以消息中间人的形式集成在Celery中: [# - app/ # |- app/__init__.py from celery import Celery celery = Celery('mltasks', broker='redis:// 使用Redis在Flask项目中进行数据保存与读取: [from redis import Redis rds = Redis(host=' ... rds.set(result_uuid, result) ... result = rds.get(result_uuid) ] 3.3 Celery进程调度 Celery进程运行在分布式系统的所有主机上,各Celery进程通过Redis消息队列交换信息,协调资源。 在Flask项目中,Celery通常在程序入口处初始化: [# - app/ # |- app/__init__.py from celery import Celery celery = Celery('mltasks', broker='redis:// 项目中涉及机器学习模型训练的代码应该放在Celery作业中运行,这部分代码应集中管理在作业模块中,将每一个耗时计算封装为独立的函数,并给函数添加@celery.task修饰符,供Flask项目代码异步调用。 函数首先使用训练集训练模型,训练过程产生的日志可以通過“logging_uuid”参数实时写入Redis消息队列供前端显示训练进度;然后将训练完成的模型保存至文件系统,同时使用训练好的模型对验证集进行预测,以查看模型泛化性能;最后将验证结果保存在Redis消息系统中,供前端显示训练结果: [# - analysis/ # |- analysis/tasks.py from app import celery from app import rds @celery.task def training_task(data, training_uuid, logging_uuid, epochs): model = SomeModel() vs = data['validate_set'] ts = data['training_set'] # 使用训练集训练模型,并设置此次训练的日志回写地址为logging_uuid training_history = model.train(ts, logging_uuid, epochs) # 使用验证集测试模型 predict = model.predict(vs) # 保存模型 model.save() # 将训练结果保存至Redis系统 training_result = json.dumps({'status': 'success', 'predict': predict}) rds.set(training_uuid, training_result) # 将训练日志状态设为训练完成,以通知前端更新显示 logging_result = json.dumps({'model_state': 'trained'}) rds.set(logging_uuid, logging_result) ] 类似的耗时操作,如训练模型、保存模型、加载模型、使用模型预测等耗时操作均可封装在上述Celery作业中,当Flask项目需要执行这些耗时操作时,使用Celery作业提供的“delay”异步调用: [# - analysis/ # |- analysis/views.py from app import celery from app import rds from flask import request from flask import Response from config import HOST_ID from analysis.tasks import training_task import uuid @app.route('/api/v1/analysis/somedata/ def data_model_training_service(data_id): if request.args.get('action') == 'train': data = some_get_data_function(data_id) # 为celery的各作业生成唯一的uuid training_uuid = HOST_ID + str(uuid.uuid1()) logging_uuid = HOST_ID + str(uuid.uuid1()) # 使用celery异步执行作业 training_task.delay(data, training_uuid, logging_uuid, epochs=int(request.args.get('epochs'))) # 执行其他操作并返回 return Response(json.dumps ({'status': 'success'})) ] 上述函数以异步的方式执行模型训练,执行后即刻返回,并指定Redis系统中的日志回写地址以及训练结果回写地址。使用“celery.task.delay”方式调用的函数均运行在Celery worker中,“delay”函数的参数列表就是“@celery.task”修饰符修饰对象的参数列表,每次“delay”调用均使用独立进程。 这种异步调用可以保证在Flask项目中该路由请求不会被耗时操作阻塞,导致前端界面无响应。为了配合使用这类耗时操作的异步调用,前端及Flask项目需要改变异步操作相关代码的编写模式: 1) 每一个耗时操作的代码应至少分为两部分,第一部分负责设置回写地址并异步执行耗时操作;第二部分负责从日志回写地址中取回耗时操作执行进度。 2) 当前端需要执行耗时操作时,向Flask项目发送开始执行的请求,Flask生成唯一的Redis回写地址并使用“delay”函数异步调用,在响应中通知前端回写地址。 3) 前端程序应定期向Flask项目发送请求,使用日志回写地址查询日志信息,并在界面中跟新耗时操作的执行进度。 使用日志回写地址取回耗时操作执行进度信息的代码示例如下: [# - analysis/ # |- analysis/views.py from app import rds @app.route('/api/v1/analysis/somedata/ def data_model_log_service(data_id): return Response(r.get(request.args.get('redisLoggingTaskID'))) ] 完成各耗时操作函数的编写后,将Flask项目及其运行环境分发在分布式系统的其他主机上,之后在这些主机上分别启动celery worker,这些进程将通过Redis主机上的消息队列交换数据并进行进程调度: [celery worker -A app.celery --loglevel=debug ] 4 结束语 本文介绍了使用Celery分布式进程调度系统搭建用于企业内部网络的机器学习计算平台。该平台架构简洁,具备较强的可扩展性,容易通过添加主机的方式线性提升系統计算能力,在企业内部的机器学习算法验证、调试及应用中发挥了重要作用,是一种易于部署实现的平台。 |
随便看 |
|
科学优质学术资源、百科知识分享平台,免费提供知识科普、生活经验分享、中外学术论文、各类范文、学术文献、教学资料、学术期刊、会议、报纸、杂志、工具书等各类资源检索、在线阅读和软件app下载服务。