/----------\ |CFtpFSM1 | +--->| cftp |--------+ | \----------/ +-----------+ +----------------+<---+ /----------\ |CamelSender| | UploadManager |<------->|CFtpFSM2 | | File2|---->| remote actor |<----+ | cftp |--------+ FTP Server +-----------+ +----------------+<--+ | \----------/ | | /----------\ localPath | +-->|CFtpFSM3 |--------+ dest dir | | cftp | | \----------/ | /----------\ +---->|CFtpFSMn |--------+ | cftp | \----------/
- 实现用akka-actor FSM 管理的并发ftp长连接客户端
- 用remote actor接收上传文件请求,收集上传文件的状态,并提供消息接口操作内部数据
- 使用akka-camel实现本地目录的监视及生成文件上传请求消息
- 消息方向(参考上面的ascii diagram):CamelSender (to) FileUploadManager (from&to) (多个)CFtpFSM
- CFtpFSM 对 CFtp类进行调用,有4个状态UnAvailable, Disconnected, Connected, LoggedIn,在LoggedIn状态接收文件上传请求(UploadFile message),保持ftp连接(通过定时NOOP),网络断开时重连,并向FileUploadManager发送文件上传成功或者失败的状态消息
- FileUploadManager在满足状态查询同时,定时扫描文件上传状态,对上传失败的文件进行重试
- CamelSender定时扫描本地目录,将新建的文件生成文件上传请求,这一层可以在满足FileUploadManager消息接口的情况下使用其他的实现方式
详见log4j.properties
在classpath添加secret.properties文件,下面是示例配置,黑体的为关键配置,需要根据实际情况修改
cftp.host=192.168.32.103
cftp.port=21
cftp.serverEncoding=GBK
cftp.user=username
cftp.pass=password
cftp.ddir=upload
cftpfsm.activeTimeout=13
cftpfsm.disconTimeout=23
cftpfsm.routerId=cftpSenderRouter
cftpfsm.count=2
cftpfsm.autoStart=true
cftpmgr.serviceId=cftp:service
cftpmgr.host=localhost
cftpmgr.port=2552
cftpmgr.mvAfterSucc=true
cftpmgr.initDelay=30
cftpmgr.delay=10
cftpmgr.maxRetry=10
cftpmgr.dumpDir=e:/temp1
cftpmgr.bakPath=e:/temp1/b
cftpcamel.localPath=e:/upload
cftpcamel.moveMode=true
cftpcamel.moveDir=.camel
cftp.serverEncoding | ftp服务器操作系统环境编码,中文Windows一般是GBK,*nix一般是UTF-8 |
cftp.ddir | ftp客户端登陆成功后切换到的工作路径,用作CWD指令的参数 |
cftpfsm.count | ftp客户端并发连接数 |
cftpfsm.activeTimeout | 为保持ftp服务器不自动断开,ftp客户端每隔activeTimeout秒向服务器发送NOOP指令 |
cftpfsm.disconTimeout | 因一些异常状况导致ftp客户端断开连接后,到下一次自动重连的间隔时间 |
cftpmgr.host | akka remote actor 监听的hostname,也用于client端使用remote.actorFor的参数 |
cftpmgr.port | akka remote actor 监听的端口,也用于client端使用remote.actorFor的参数 |
cftpmgr.initDelay | 自动向UploadFileManger发送RetryAll消息(用于重传文件)的初始间隔秒数 |
cftpmgr.delay | 每次自动向UploadFileManger发送RetryAll消息的间隔秒数 |
cftpmgr.maxRetry | 文件上传失败后重新上传的尝试次数 |
cftpmgr.dumpDir= | UploadFileManger接收到DumpToFile消息后会在此目录下生成dump.(系统时间数字串) |
cftpmgr.bakPath | 文件上传成功之后会被UploadFileManger移动到这个目录,如果重名,会添加系统时间数字串重试 |
cftpcamel.localPath | 待上传文件的路径,由 Camel Service 定时扫描 |
(目前直接在sbt中运行已经能满足要求,计划添加直接由java执行的方式)
- 启动FSM以及FileUploadManager service,com.shsz.young.cftp.CFtpFSM.runDefault
- 启动CamelSender,com.shsz.young.cftp.sender.CamelSender.start
- 停止CamelSender,com.shsz.young.cftp.sender.CamelSender.stop
- 停止FSM以及service,com.shsz.young.cftp.CFtpFSM.shutdown
- 默认配置下内部的文件操作:cftpcamel.localPath的文件由camel移动至此目录下的cftpcamel.moveDir,由camel向FileUploadManager发送文件上传请求消息后,cftp实现文件的上传,并将上传成功消息发送给FileUploadManager,由FileUploadManager把上传成功的文件由cftpcamel.moveDir移动至cftpmgr.bakPath,上传失败的文件会被FileUploadManager定时生成重新上传的请求消息,默认情况下会重试10次(cftpmgr.maxRetry)
- 获取FileUploadManager内部保持的文件状态结构,
val m = CFtpFSM.DEFAULTMANAGER |
用于同一JVM |
val m = remote.actorFor("cftp:service", "localhost", "2552") |
用于不同JVM |
m ! Ping |
测试每个并发ftp客户端是否活动,会在UploadFileManager运行的jvm输出日志 |
m ! Dump |
在UploadFileManager运行的jvm输出日志,包含内部管理的上传文件列表 |
m ! DumpToFile |
功能同Dump,UploadFileManger接收到消息后会在cftpmgr.dumpDir目录下生成dump.(系统时间数字串) |
m ! Pure |
清除UploadFileManager中所有重试次数达到上限的文件数据项,一般用于维护时,使用DumpToFile分析之后 |
m ! Clear |
清除上传文件列表,慎用 |
中文文件名的文件无法上传,目前生产环境已能满足要求,计划完善- 计划使用Supervisor实现对CFtpFSM的重启,目前如果预计网络可能出现问题,需要监控cftpcamel.moveDir和cftpcamel.localPath
- 添加断点续传的功能,满足大文件的上传环境