BDOA为符合DOA标准的一套完整实现的开源项目,支持标识注册、解析、寻址、查询等功能,主要包括标识系统、注册表和仓库三大子系统,并支持IRP和DOIP两大协议。本文档为BDOA的介绍文档,介绍了BDOA涉及的数字对象体系结构的相关概念以及架构设计等内容,相关代码会在2021年9月份进行开源。
数字对象体系结构(Digital Object Architecture, DOA)是现有Internet体系结构的一个逻辑扩展,它支持不同系统之间的数据互操作需求,而不仅仅是将数字形式的信息从Internet的一个位置传送到另一个位置。数字对象体系结构中的基本元素是数字对象(Digital Object),是对任意信息系统数据资源的统一抽象。数字对象分为四个部分:标识、元数据、状态数据以及实体。其中,标识永久且唯一的指向一个数字对象,是数字对象永久不变的属性;元数据是数字对象的描述信息,用于数字对象的检索和发现;状态数据包含数字对象当前位置、访问入口、访问方式等信息,用于数字对象的定位和访问;实体则是数字对象所包含的实际内容,可以是任何比特序列或一系列比特序列的集合。标识和状态数据的分离也使得数字对象的标识不在与数字对象的访问入口紧耦合,使得无论是否在互联网环境中,数字对象都可以被共享、访问,进而实现普适环境下的信息系统互操作。 DO是对互联网资源的抽象,DO代表互联网中有意义或者是有价值的资源。DO的表现形式可以是一个比特序列,或者是多个比特序列的集合;也可以是一个对外提供接口的程序。就像互联网中每一个Host都有一个IP地址作为标识一样,在DOA中每个DO同样有一个标识,用于对这个DO进行唯一的识别,这个标识叫做DOID。DOA中有两个基本协议:标识/解析协议IRP(Identifier/Resolution Protocol)和数字对象接口协议DOIP(Digital Object Interface Protocol)。 DOA中有三个基本的组成部分,分别是标识/解析系统(Identifier/Resolution System)负责DO的标识和解析、仓库系统(Repository System)负责DO的存储和访问、以及注册表系统负责DO元信息的注册和DO的搜索(Registry System)。
IRP协议规定了DO标识的创建、修改、删除和解析方式,同样指定了标识的语法以及分配方案。IRP协议之于DOA就如同IP协议、路由协议之于Internet。在DOA中,通过IRP协议解析一个DO的标识可以查询到该标识对应的DO的状态信息,其中包括DO的存储位置、所有者等和DO的访问直接相关的信息。
标识/解析系统是DOA体系中三个基本构件之一,其主要功能如下:
标识注册与解析系统包括LRS和GRS两种,即局部标识注册与解析系统以及全局标识注册与解析系统。
主要有如下功能:
一种特殊的IRS,访问地址全局可知且不变。主要有如下功能:
在数字对象体系结构中,Registry负责对数字对象元信息的管理。Registry和数字对象的实际访问服务提供者Repository逻辑上一般是一对一或一对多的关系,即:在物理上将Repository和Registry部署在一起,同时对外提供数字对象的访问和搜索功能;或者将Registry作为独立构件部署,此时一个Registry一般管理多个Repository中数字对象的元信息,Registry通过标准的数字对象接口协议对外提供元数据的管理服务以及数字对象搜索服务。在DOA中,有一种特殊类型的DOMeta DO。Meta DO中包含的是其他DO的描述信息、标识的键值对,并以固定的格式序列化。通过检索Meta DO中的数据,可以根据关键字搜索到目标DO的标识。注册表系统是一种特殊的仓库系统,仅负责Meta DO的存储、管理。注册表系统同样以DOIP协议的标准对外提供服务,创建、管理Meta DO,并支持DOIP协议中的Search操作。
仓库系统对外提供DO的访问管理及持久化服务。仓库系统需要对外暴露访问接口,接收本文所定义的DOIP请求消息,根据DO的标识和操作标识,处理本地存储的DO,并根据处理结果和协议规范,返回DOIP响应消息。仓库系统提供的DOIP服务也可以被抽象成DO,并在标识/解析系统中注册。根据仓库的标识,可以解析到仓库的访问地址,提供服务的方式等相关信息。DOIP协议对外屏蔽了仓库系统的实现方式,仓库系统可以以任何形式进行实现,只需要对外暴露可以接受DOIP请求消息的接口即可。DOIP服务接口也可以建立在多种传输协议之上,如:http,tcp,udp等。具体的接口形式在标识解析系统中描述清楚即可。关于DOIP 的详细描述,将在下文阐述。
数字对象接口协议(Digital Object Interface Protocol, DOIP)规定了对数字对象的操作语义,以及互操作消息的传输语法。DOIP的操作语义在不同系统中的具体实现可以不同,但对外暴露的接口都需要满足DOIP协议的统一标准。由此数字对象应用系统可以以统一的形式访问不同系统提供的数字对象,而操作的具体细节对应用系统完全透明。
数字对象实体的数据结构设计 数字对象的状态信息数据结构设计 数字对象的元数据数据结构设计
BDOA部署
数字对象创建流程 数字对象访问流程 数字对象修改、删除流程 数字对象索引、搜索流程
数字对象体系结构DOA(Digital Object Architecture)是互联网之父,图灵奖得主Robert E. Kahn提出的一种互联网的逻辑扩展, 以数字对象的形式抽象互联网资源,使得系统之间可以直接管理信息,而非仅将数据以比特流的形式从一个节点传递到另一个节点。 以数据为中心的方式实现信息系统之间的互操作。 关于DOA的详细介绍,请参考www.dona.net
本项目是北京大学操作系统研究中心开发的DOA架构的完整实现,包含DOA两个核心协议的参考实现SDK:IRP(Identifier/Resolution Protocol)、DOIP(Digital Object Interface Protocol) 以及基于协议SDK开发的DOA三个基本构件的参考实现:Repository、Registry、Identifier/Resolution System)
项目包含6个模块
向全局标识解析系统GRS(Global Resolution System)申请前缀,得到GRS分配的前缀和JWK密钥。(可选)
启动irs/src/mian/java/org.bdware.irs/IrsBoot,会启动IRS和GRS服务,以默认账号登陆可以进入GRS的前缀分配界面,分配前缀后,在irsCofig.json文件中可以修改irs的启动信息。可通过port参数修改监听端口,XXX配置前缀、密钥(默认前缀为local)
运行client模块下org.bdware.utils.UserCreator,向IRS注册用户,会在当前目录生成"用户名.jwk"文件,其中为JWK密钥, 用于启动Repository、Registry或运行DOAClient。也可以通过其他JWK生成工具生成,保存为文件,例如mkjwk 。
public class UserCreator {
static String userID = "local/dou.test";
static String IRSAddress = "http://127.0.0.1:10001";
public static void main(String[] args) throws Exception {
// 创建JWK并向LRS注册用户标识
// 参考实际代码
}
}
复制config目录下doipConfigTemplate.json至项目根目录,重命名为repoConfig.json,按照LRS地址和用户id修改配置文件,其repoID置空。 其中listeners代表了DOIP服务所监听的端口和协议,同一个DOIP服务可以监听多个不同的端口,项目中支持的通讯协议包括: TCP(url示例:tcp://127.0.0.1:1717)、UDP(url示例:udp://127.0.0.1:1717)、TLS(url示例:tls://127.0.0.1:1717)、以及WebSocket(url示例:ws://127.0.0.1:1717/ws/)。
{
"IRSAddress": "http://127.0.0.1:10001/", //LRS服务地址
"userID": "local/dou.generated", //UserCreator创建的UserID
"jwkPath": "./dou.generated.jwk", //UserCreator创建的jwk文件地址
"repoID": "", //初始为空,启动时向LRS注册获取
"listeners": "[{\"url\":\"tcp://127.0.0.1:1717\",\"protocolVersion\":\"2.1\",\"messageFormat\":\"packet\"}]", //DOIP服务地址,可填多个(参考org.bdware.repository.examples.MultipleListenerRepositoryMain)
"serviceDescription": "Local BDRepository", //doip服务描述
"serviceName": "LocalBDRepo", //doip服务名
"type": "repository" //doip服务类型,repository/registry
}
启动org.bdware.repository.BDRepositoryMain,会同时启动DOIP服务以及Web管理端,Web管理端口默认为8080。通过[url]进入Repository Web管理界面。
复制config目录下doipConfigTemplate.json至项目根目录,重命名为regConfig.json,按照LRS地址和用户id修改配置文件,其repoID置空
{
"IRSAddress": "http://127.0.0.1:10001/", //LRS服务地址
"userID": "local/dou.generated", //UserCreator创建的UserID
"jwkPath": "./dou.generated.jwk", //UserCreator创建的jwk文件地址
"repoID": "", //初始为空,启动时向LRS注册获取
"listeners": "[{\"url\":\"tcp://127.0.0.1:1717\",\"protocolVersion\":\"2.1\",\"messageFormat\":\"packet\"}]", //DOIP服务地址,可填多个(参考org.bdware.repository.examples.MultipleListenerRepositoryMain)
"serviceDescription": "Local BDRepository", //doip服务描述
"serviceName": "LocalBDRepo", //doip服务名
"type": "repository" //doip服务类型,repository/registry
}
启动org.bdware.repository.BDRegistryMain,会同时启动DOIP服务以及Web管理端,Web管理端口默认为8080。通过[url]进入Registry Web管理界面,其中提供图形化界面创建、管理DO。
client模块下,org.bdware.doaclient.DOAClient类封装了DOIPClient接口,提供了和Repository和Registry进行DOIP交互的同步接口。DOAClient用法参考类 client模块测试用例client/src/test/java/DOAClientTest类。 DOAClient实例的初始化需要传入JWK.json路径以及目标LRS的服务路径
public class DOAClient {
public DOAClient(String jwkPath, String IRSAddress){
try {
GlobalCertifications.loadKeysFromJWK(jwkPath);
} catch (Exception e) {
e.printStackTrace();
}
GlobalConfigurations.IRSAddress = IRSAddress;
container = new ResponseContainer();
cb = new SyncCallback(container);
init();
}
//其他方法详见源码
}
数字对象是DOA中的基本元素,一个数字对象由其标识id,类型type,属性attributes,以及元素集elements组成。 关于数字对象的基础定义,请参见DOIP协议文本Digital Object章节。 本项目在实施过程中对数字对象进行了扩展,本文主要着重介绍扩展部分。
首先是在数字对象元数据Metadata方面的扩展,DOWithMetadata以及MetaDO
在DO的attributes中预留关键字"metadata"作为数字对象的元数据,对应一个JSONObject。 在BDRepositoryHandler的handleCreate方法中会根据此字段识别数字对象的元数据,并根据元数据建立数字对象的索引。 代码实现参见org.bdware.doip.core.model.digitalObject.DOWithMetadata
public class DOWithMetadata extends DigitalObject{
public static final String METADATA = "metadata";
public static DOWithMetadata fromDO(DigitalObject digitalObject){
if(digitalObject.attributes == null || digitalObject.attributes.get(METADATA) == null){
return null;
}
return (DOWithMetadata)digitalObject;
}
public DOWithMetadata(String id, DoType type) {
super(id, type);
}
public void addMetadata(String key, String value){
if(attributes==null || attributes.get(METADATA) == null){
addAttribute(METADATA, new JsonObject());
}
attributes.get(METADATA).getAsJsonObject().addProperty(key,value);
}
public JsonObject getMetadata(){
if(attributes.get(METADATA) == null) return null;
return attributes.get(METADATA).getAsJsonObject();
}
public String getMetadata(String key){
if(attributes.get(METADATA) == null) return null;
return attributes.get(METADATA).getAsJsonObject().get(key).getAsString();
}
public void setMetadata(JsonObject jo){
this.addAttribute(METADATA,jo);
}
}
MetaDO是一类特殊的DO,代表某个原始DO的元数据,MetaDO的type为"0.TYPE/DO.Metadata",id与原DO相同。 MetaDO的有效数据仅包含id、type以及attributes中的metadata。 可以通过向Registry发送DOIP请求来管理MetaDO,Registry通过处理对MetaDO的增删改查请求来实现对原DO的索引,并提供对DO的搜索服务。 org.bdware.doip.core.model.metadata.MetaDO的代码实现如下:
public class MetaDO extends DOWithMetadata {
public MetaDO(String id){
super(id,DoType.Metadata);
}
public static MetaDO fromDO(DigitalObject originalDO){
MetaDO meta = new MetaDO(originalDO.id);
if(originalDO.attributes == null || originalDO.attributes.get(METADATA) == null){
return null;
}
meta.addAttribute(METADATA,originalDO.attributes.get(METADATA));
return meta;
}
public static MetaDO fromDOWithMetadata(DOWithMetadata doWithMetadata){
MetaDO meta = new MetaDO(doWithMetadata.id);
meta.addAttribute(METADATA,doWithMetadata.getMetadata());
return meta;
}
}
其次是在DO的attributes中规定了预留字段doOwner,用于表达该数字对象的所有者。 BDRepository和BDRegistry会通过该字段识别数字对象的实际所有者,并进行相应的权限设置。 org.bdware.doip.core.model.digitalObject.DOWithOwner的代码实现如下:
public class DOWithOwner extends DigitalObject{
public static final String DO_OWNER = "doOwner";
public static DOWithOwner fromDO(DigitalObject digitalObject){
if(digitalObject.attributes == null || digitalObject.attributes.get(DO_OWNER) == null){
return null;
}
return (DOWithOwner)digitalObject;
}
public DOWithOwner(String id, DoType type) {
super(id, type);
}
public void setDoOwner(String ownerID){
addAttribute(DO_OWNER,ownerID);
}
public String getDoOwner(){
if(attributes.get(DO_OWNER) == null) return null;
return attributes.get(DO_OWNER).getAsString();
}
}
标识/解析系统是DOA体系中三个基本构件之一,其主要功能如下:
本项目的LRS标识解析系统不仅会为DO分配标识,还会基于LRS做身份认证和DOIP服务寻址,即:
仓库系统对外提供DO的访问管理及持久化服务。 仓库系统需要对外暴露访问接口,接收DOIP请求消息,根据DO的标识和操作标识,处理本地存储的DO,并根据处理结果和协议规范,返回DOIP响应消息。 仓库系统同样具有全网唯一标识,并在标识/解析系统中注册。根据仓库的标识,可以解析到仓库的访问地址,提供服务的方式等相关信息。 DOIP协议对外屏蔽了仓库系统的实现方式,仓库系统可以以任何形式进行实现,只需要对外暴露可以接受DOIP请求消息的接口即可。 DOIP服务接口也可以建立在多种传输协议之上,如:websocket,tcp,udp等。
本项目提供了数字对象仓库的一种参考实现BDRepository,采用RocksDB作为持久化存储。仓库入口为org.bdware.repository.BDRepositoryMain, 分别启动仓库服务主线程org.bdware.repository.main.BDRepository, 以及Web管理服务org.bdware.repository.webService.Application。
仓库核心代码为org.bdware.repository.handler.BDRepositoryHandler,其中定义了对DOIP消息的具体处理逻辑。 BDRepositoryHandler通过重写handleCreate,handleHello等方法,实现了对7个DOIP基本操作的响应,根据DOIP消息头的Parameter中Operation的值决定调用哪个方法处理DOIP消息。
public class BDRepositoryHandler extends RegistryHandlerBase {
static Logger logger = Logger.getLogger(BDRepositoryHandler.class);
BDRepoStorage store; //数字对象的持久化存储,基于RocksDB
DoIndexer indexer; //本地创建数字对象索引,基于Lucene
HashMap<String,RemoteReg> remoteRegs; //记录该Repo接入的远程Registry,支持基于Registry的动态组网
static final String REMOTE_REG = "REMOTEREG";
static final String DO_OWNER = "doOwner";
public BDRepositoryHandler(DoipServiceInfo info, BDRepoStorage store) throws RocksDBException {
super(info);
this.store = store;
if(store.getRaw(REMOTE_REG.getBytes()) != null){
remoteRegs = loadRegList();
} else
remoteRegs = new HashMap<>();
}
public BDRepositoryHandler(DoipServiceInfo info, BDRepoStorage store, DoIndexer indexer) throws RocksDBException {
this(info,store);
this.indexer = indexer;
}
@Override
public DoipMessage handleHello(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Delete 的消息
}
@Override
public DoipMessage handleListOps(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Delete 的消息
}
@Override
public DoipMessage handleCreate(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Delete 的消息
}
@Override
public DoipMessage handleUpdate(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Delete 的消息
}
@Override
public DoipMessage handleDelete(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Delete 的消息
}
@Override
public DoipMessage handleRetrieve(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Retrieve 的消息
}
@Override
public DoipMessage handleSearch(DoipMessage request) {
//处理DoipMessageHeader.parameter中operation=0.DOIP/Op.Search的消息
}
}
可以通过扩展DOIPServiceHandler定制化实现自己的仓库,扩展细节参见扩展与自定义章节。
仓库的Web管理服务基于SpringBoot+React实现,启动后默认通过8080端口访问管理界面,基于用户标识和JWK实现身份验证和用户登录。
注册表系统负责存储管理DO的元信息,并提供搜索服务。通过对MetaDO的增删改查实现对数字对象索引的创建、修改等。有关MetaDO的描述,见数字对象DigitalObject部分。 注册表和仓库Repository同样基于DOIP提供服务,仅在处理DOIP消息的Handler实现上有区别。Registry的Handler参见org.bdware.registry.handler.BDRegistryHandler。
BDRegistryHandler除了支持基本的DOIP操作之外,还扩展了两个操作:Op.Join以及Op.Quit,分别处理DOIPMessageHeader中operation="Op.Join"以及"Op.Quit"的消息,以支持其他Repository或Registry的加入和退出。 当BDRegistry接收到搜索请求时,不仅会进行本地搜索,还会将搜索请求转发至所有已接入的Registry/Repository,获取反馈、汇聚搜索结果之后返回给请求客户端。 通过注解的方式可以轻松的扩展DOIP操作,更多详细信息可以参见#扩展与自定义章节#
public class BDRegistryHandler extends RegistryHandlerBase {
/*
其他部分详见具体代码,和BDRepository代码类似
*/
//扩展操作Op.Join,允许其他Repository或Registry接入,提供分布式搜索功能
//DoipMessageHeader.parameter中operation=Op.Join
@Op(op = BasicOperations.Extension,name = "Op.Join")
public DoipMessage handleJoin(DoipMessage request){
if(request.header.parameters.attributes == null ||request.header.parameters.attributes.get("repoID")== null){
logger.info("invalid join request: repoID not found");
return replyStringWithStatus(request, "invalid join request: repoID not found.", DoipResponseCode.Invalid);
}
String repoID = request.header.parameters.attributes.get("repoID").getAsString();
String repoUrl = request.header.parameters.attributes.get("repoUrl").getAsString();
remoteRegs.put(repoID,repoUrl);
saveRegFederation();
return replyString(request,"success");
}
//扩展操作Op.Quit,允许其他Repository或Registry接入,提供分布式搜索功能
//DoipMessageHeader.parameter中operation=Op.Quit
@Op(op = BasicOperations.Extension,name = "Op.Quit")
public DoipMessage handleQuit(DoipMessage request){
if(request.header.parameters.attributes == null ||request.header.parameters.attributes.get("repoID")== null){
logger.info("invalid join request: repoID not found");
return replyStringWithStatus(request, "invalid join request: repoID not found.", DoipResponseCode.Invalid);
}
String repoID = request.header.parameters.attributes.get("repoID").getAsString();
remoteRegs.remove(repoID);
saveRegFederation();
return replyString(request,"success");
}
}
DOA本质上是一个网络系统,物理上分布于不同位置的系统,基于网络基础设施相互连接,并通过IRP、DOIP两个标准协议实现系统之间的协作。 逻辑上功能不同的构件IRS、Registry、Repository连接在一起,形成数联网基础设施,共同为数字对象的访问、运行提供基础。 DOA组网示例如下图所示。
细化来看,DOA之间的组网可以分为三个部分,GRS和LRS之间的组网、LRS和Repository/Registry之间的组网以及Repository和Registry之间的组网。
标识解析系统IRS逻辑上分为两层,顶层是GRS,下层是LRS。GRS负责为每个LRS分配一个唯一的标识(即前缀),并提供前缀的解析服务,根据前缀返回LRS的服务地址。 GRS管理全局所有LRS的状态信息。
在本项目的实现中,管理员登录GRS管理界面,采用手动分配的方式分配一个前缀以及对应的密钥。GRS的服务地址公开可知,LRS在启动时需向GRS更新自己的状态,确保通过GRS可以获取到LRS的状态。
Repository和Registry启动前需要在配置文件中指定所连接的LRS,初次启动Repository和Registry会向该LRS申请标识,标识的前缀为LRS的标识,后缀则由LRS生成,保证唯一。 拥有标识后,Repository和Registry需要和前缀指定的LRS的连接,才能确保自己可以被其他节点解析到。
目前LRS和Repository/Registry之间的组网是静态的,在启动时就需要指定所连接的LRS且只能和前缀指定的LRS连接。动态的LRS连接方式计划后续版本中推出。
Repository在启动之后可以动态的选择一个或多个Registry进行连接,连接的方式分为"托管"、"接入"两种,托管连接下,Repository在接收到新的创建DO请求时会将DO的元数据 发送给所托管的Registry,Registry在本地根据元数据建立DO索引,提供该DO的搜索服务。(目前版本不会将存量的DO元数据托管给新接入的Registry) 在接入模式下,Repository不会将元数据发送给Registry,而是将自己的服务地址提交给Registry,Registry在接收到搜索请求后会将搜索请求转发至所有接入的Repository上 合并Repository的搜索结果后返回给用户。
技术上,接入功能通过扩展DOIP,新增操作Op.Join和Op.Quit实现,Repository和Registry连接通过向Registry发送Op.Join/Op.Quit请求分别实现Repository的接入和退出。
同理,Registry也可以通过Op.Join请求接入其他Registry,实现分布式搜索的功能。
DOIP和IRP协议仅对构件之间交互的语法语义进行了规范,构建内部对于具体消息的处理逻辑可以根据需求自行实现。 本项目给出了一种完整实现的版本,内置了基于RocksDB的持久化存储、基于Lucene的搜索引擎等,可以按照消息语义实现基础的数字对象增删改查搜的功能。 结合具体应用场景,开发人员可以根据需求自定义实现Repository和Registry的处理逻辑,实现更丰富的功能。
通过实现RepositoryHandler和RegistryHandler接口中的方法,可以实现不同的DOOIP消息处理逻辑。 通过DoipRequestHandler.setRepositoryHandler可以在一个Repository或Registry中应用Handler。
RepositoryHandler代码如下
public interface RepositoryHandler {
@Op(op = BasicOperations.Hello)
DoipMessage handleHello(DoipMessage request);
@Op(op = BasicOperations.ListOps)
DoipMessage handleListOps(DoipMessage request);
@Op(op = BasicOperations.Create)
DoipMessage handleCreate(DoipMessage request);
@Op(op = BasicOperations.Update)
DoipMessage handleUpdate(DoipMessage request);
@Op(op = BasicOperations.Delete)
DoipMessage handleDelete(DoipMessage request);
@Op(op = BasicOperations.Retrieve)
DoipMessage handleRetrieve(DoipMessage request);
}
不同的接口被赋予了不同的注解,在接收到DOIP消息时,会根据DOIPMessage.MessageHeader.HeaderParameter.operation判断其操作码调用相应的操作。 项目中对该接口的实现参考BDRepositoryHandler。对于继承的基本DOIP操作BasicOperations,不需要在实例化的方法上增加注解,在RequestHandlerImpl初始化时会根据其集成的方法逐层向上查找注解,直至找到 RepositoryHandler和RegistryHandler方法上的注解。 示例代码如下:
public class RequestHandlerImpl implements DoipRequestHandler {
Map<String, Method> handlers;
static Logger logger = Logger.getLogger(NettyServerHandler.class);
protected RepositoryHandler doipHandler;
public RequestHandlerImpl(RepositoryHandler doipHandler) {
handlers = new HashMap<>();
this.doipHandler = doipHandler;
Class handlerClass = doipHandler.getClass();
while(handlerClass != Object.class){
putDoipHandlerMethod(handlerClass);
Class[] interfaces = handlerClass.getInterfaces();
for (Class clz : interfaces) {
putDoipHandlerMethod(clz);
}
handlerClass = handlerClass.getSuperclass();
}
}
private void putDoipHandlerMethod(Class handlerClass) {
Method[] methods = handlerClass.getDeclaredMethods();
for (Method m : methods) {
Op a = m.getAnnotation(Op.class);
if (a != null) {
if (a.op() != BasicOperations.Extension) {
putHandler(a.op().getName(), m);
} else{
putHandler(a.name(), m);
}
}
}
}
private void putHandler(String name, Method m) {
if (handlers.containsKey(name)) {
return;
}
logger.debug("[Register operation] name: " + name);
m.setAccessible(true);
handlers.put(name, m);
}
@Override
public DoipMessage onRequest(DoipMessage msg) {
String str = msg.header.parameters.operation;
logger.debug("[Call operation] name: " + str);
if (str != null) {
Method m;
m = handlers.get(str);
if (m == null) m = handlers.get(BasicOperations.Unknown.getName());
if (m != null) {
try {
return (DoipMessage) m.invoke(doipHandler, msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
return null;
}
}
DOIP同样支持方法的扩展,但需要为扩展的方法增加注解以匹配新的操作码。扩展操作码的注解形式可参见BDRegistryHandler中的handleJoin方法:
public class BDRegistryHandler extends RegistryHandlerBase {
/*
.....
*/
@Op(op = BasicOperations.Extension,name = "Op.Join")
public DoipMessage handleJoin(DoipMessage request){
if(request.header.parameters.attributes == null ||request.header.parameters.attributes.get("repoID")== null){
logger.info("invalid join request: repoID not found");
return replyStringWithStatus(request, "invalid join request: repoID not found.", DoipResponseCode.Invalid);
}
String repoID = request.header.parameters.attributes.get("repoID").getAsString();
String repoUrl = request.header.parameters.attributes.get("repoUrl").getAsString();
inRegs.put(repoID,repoUrl);
saveRegFederation();
return replyString(request,"success");
}
/*
.....
*/
}
上述例子中BDRegistryHandler实现了RegistryHandler接口,除了实现基础的7个DOIP操作外还扩展了Join和Quit两个操作以支持Registry和Repository之间的组网。 以Join方法为例,通过为handleJoin方法添加注解@Op(op = BasicOperations.Extension,name = "Op.Join"),BDRegistryHandler可以处理DOIP消息中operation为Op.Join的消息, 其中op代表此方法为扩展的方法,name即为需要匹配的操作码。
DOIP协议底层可以基于任意的通讯协议传递消息,本项目基于Netty实现了TCP,UDP,TLS以及Websocket协议,开发人员也可以结合实际需求扩展其他通讯协议。
基于新通讯协议的DOIP需要实现DOIPListener和DOIPClientChannel两个接口,分别用于服务端(Repository/Registry)和客户端。 然后分别通过DoipListenerGenerator.addListener和DoipClientChannelGenerator.addClientChannel方法动态配置。 Repository和Client在启动时会根据url的schema选择相应的Listener/ClientChannel连接。
一个扩展的基于Bluetooth协议的DOIP可参见仓库https://gitee.com/blessser/DoRepoAtPhone,其中BlueToothDoipListener即为Bluetooth协议实现,代码示例如下:
public class BlueToothDoipListener implements DoipListener {
private static final String TAG = "BlueToothDoipListener";
private static Context ctx;
private BluetoothServerSocket mSSocket;
private boolean isListenning = true;
private PacketMessageCodec codec;
DoipRequestHandler doipRequestHandler;
DoipListenerInfo btListenerInfo;
public BlueToothDoipListener(Context c, DoipListenerInfo listenerInfo) {
codec = new PacketMessageCodecImpl();
ctx = c;
btListenerInfo = listenerInfo;
}
@Override
public void start() {
log("start listener");
try {
listen();
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void stop() {
try {
mSSocket.close();
isListenning = false;
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void setRequestHandler(DoipRequestHandler doipRequestHandler) {
this.doipRequestHandler = doipRequestHandler;
}
/**
* 监听客户端发起的连接
*/
@SuppressLint("HardwareIds")
private void listen() throws IOException {
BluetoothAdapter bluetoothAdapter = BluetoothAdapter.getDefaultAdapter();
if(bluetoothAdapter == null) {
//the device doesn't support bluetooth
Log.e("BlueTooth","do not support Bluetooth");
return;
}
if(!bluetoothAdapter.isEnabled()) {
Log.e("Bluetooth","start blue tooth first");
return;
}
mSSocket = bluetoothAdapter.listenUsingRfcommWithServiceRecord("DoipBTService", AndroidInitializer.DOIP_SERVICE_UUID);
log("try to listen, local address: " + bluetoothAdapter.getAddress());
log("try to listen, local name: " + bluetoothAdapter.getName());
btListenerInfo.url = "bluetooth://" + bluetoothAdapter.getAddress();
while(isListenning){
waitForConnect();
log("no input for 10s, connection released");
}
}
private void waitForConnect() throws IOException {
log("wait for connect");
BluetoothSocket connectedSocket = mSSocket.accept();
log("device connected, name: " +connectedSocket.getRemoteDevice().getName());
log("device connected, address: " +connectedSocket.getRemoteDevice().getAddress());
new BtSocketThread(connectedSocket).start();
}
private void handleAndResponseDoipMessage(DoipMessage msg, OutputStream output){
if (msg.header.parameters == null || msg.header.parameters.operation == null || msg.header.parameters.response != null) {
replyStringWithStatus(output,msg,"invalid request", DoipResponseCode.Invalid);
return;
};
if(msg.header.isCertified()){
try {
if(!GlobalCertifications.verifyDoipMessage(msg)){
log("verification failed");
return;
}
}catch (Exception e){
e.printStackTrace();
return;
}
}
if (msg.credential != null) log("[Caller] client ID: " + msg.credential.getSigner());
else log("[Caller] client ID: Anonymous");
DoipMessage response = doipRequestHandler.onRequest(msg);
if(response != null)
sendResponse(output, response);
else
defaultHandler(output,msg);
}
private void defaultHandler(OutputStream ctx, DoipMessage request) {
replyStringWithStatus(ctx,request,"Unsupported Operation!", DoipResponseCode.Declined);
}
private void replyStringWithStatus(OutputStream output, DoipMessage request, String str, DoipResponseCode resp) {
DoipMessage response;
response = new DoipMessageFactory.DoipMessageBuilder()
.createResponse(resp,request)
.setBody(str.getBytes())
.create();
sendResponse(output,response);
}
//基于编解码器将DoipMessage编码并通过Bluetooth协议的OutputStream发送
private void sendResponse(OutputStream output, DoipMessage response){
try {
if(response.header.isCertified()){
try {
GlobalCertifications.signDoipMessage(response);
}catch (Exception e){
e.printStackTrace();
}
}
ArrayList<MessageEnvelope> envs = codec.MessageToEnvelopes(response);
for(MessageEnvelope env:envs) {
byte[] envBytes = codec.EnvelopeToBytes(env);
log("sending bytes length: " + envBytes.length);
output.write(codec.EnvelopeToBytes(env));
}
}catch (Exception e){
e.printStackTrace();
}
}
//基于编解码器从Bluetooth协议的InputStream中解码DoipMessage
private DoipMessage getDoipMessage(InputStream in){
DoipMessage message = null;
try {
byte[] envHeader = getBytesFromInputStream(in,24);
int requestId = ((envHeader[8+3] & 0xFF)
| ((envHeader[8+2] & 0xFF)<<8)
| ((envHeader[8+1] & 0xFF)<<16)
| ((envHeader[8] & 0xFF)<<24));
log("requestId: " + requestId);
int contentLength = (int) ((envHeader[20+3] & 0xFF)
| ((envHeader[20+2] & 0xFF)<<8)
| ((envHeader[20+1] & 0xFF)<<16)
| ((envHeader[20] & 0xFF)<<24));
log("content Length: " + contentLength);
byte[] envContent = getBytesFromInputStream(in, contentLength);
return codec.BytesToMessage(envContent,requestId,null);
} catch (Exception e) {
e.printStackTrace();
}
return message;
}
private byte[] getBytesFromInputStream(InputStream in, int len) throws IOException {
log("reading inputStream, available: " + in.available());
int count = 0;
while(in.available() < len && count <100){
count ++;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
byte[] bytes = new byte[len];
in.read(bytes);
return bytes;
}
private static void log(String s){
Log.d(TAG,s);
}
class BtSocketThread extends Thread{
BluetoothSocket socket;
BtSocketThread(BluetoothSocket s){
socket = s;
}
@Override
public void run(){
try {
int missingInputCount = 0;
InputStream input = socket.getInputStream();
OutputStream output = socket.getOutputStream();
while(socket.isConnected()){
if(input.available() == 0){
missingInputCount ++;
// if(missingInputCount > 10){
// socket.close();
// }
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
DoipMessage req = getDoipMessage(input);
handleAndResponseDoipMessage(req,output);
}
log("device disconnected, name: " + socket.getRemoteDevice().getName());
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public class BlueToothDoipClient implements DoipClientChannel {
private Context ctx;
private String address;
private final String TAG = "BluetoothDoipClient";
private BluetoothDevice device;
private BluetoothAdapter bluetoothAdapter;
private BluetoothSocket targetSocket;
private PacketMessageCodec codec = new PacketMessageCodecImpl();
Random random = new Random();
boolean isConnected = false;
private HashMap<Integer,DoipMessageCallback> waitingCallback = new HashMap<>();
private boolean connecting = false;
public BlueToothDoipClient(Context c){
ctx = c;
}
@Override
public void close(){
try {
targetSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void connect(String url) throws URISyntaxException{
isConnected = false;
URI uri = new URI(url);
this.address = uri.getAuthority();
try {
connectBTServer();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public boolean isConnected() {
return isConnected;
}
@Override
public void sendMessage(DoipMessage message, DoipMessageCallback cb){
log("try to send message");
if(device == null){
Log.e(TAG,"no connection to target repository device");
}
try {
while(!isConnected){
log("not connected, wait for connect.");
Thread.sleep(500);
}
if(message.requestID == 0) message.requestID = random.nextInt();
log("requestID in message: " + message.requestID);
OutputStream output = targetSocket.getOutputStream();
ArrayList<MessageEnvelope> envelopes = codec.MessageToEnvelopes(message);
waitingCallback.put(message.requestID,cb);
log("connected: " + targetSocket.isConnected());
for (MessageEnvelope env : envelopes) {
log("requestID in envelope: " + env.requestId);
byte[] envBytes = codec.EnvelopeToBytes(env);
log("sending bytes length: " + envBytes.length);
output.write(codec.EnvelopeToBytes(env));
}
InputStream input = targetSocket.getInputStream();
new ResponseHandlerThread(input).start();
}catch (Exception e){
e.printStackTrace();
targetSocket = null;
}
};
private void connectBTServer() throws IOException {
bluetoothAdapter = BluetoothAdapter.getDefaultAdapter();
if (bluetoothAdapter == null) {
// 说明此设备不支持蓝牙操作
log("do not support Bluetooth");
return;
}
if (!bluetoothAdapter.isEnabled()) {
AlertDialog dialog = new AlertDialog.Builder(ctx).setTitle("提示").setMessage("打开蓝牙再尝试!").create();
dialog.show();
return;
}
for(BluetoothDevice dev:bluetoothAdapter.getBondedDevices()){
log("bounded bt address: " + dev.getAddress());
log("bounded bt name: " + dev.getName());
log("bounded bt state: " + dev.getType());
};
device = bluetoothAdapter.getRemoteDevice(address);
if(device == null){
log("cannot find target device. ");
return;
}
log("remote bt address: " + device.getAddress());
log("remote bt name: " + device.getName());
log("remote bt state: " + device.getType());
targetSocket = device.createRfcommSocketToServiceRecord(AndroidInitializer.DOIP_SERVICE_UUID);
new ConnectThread(targetSocket).start();
}
void log(String s){
Log.d(TAG,s);
}
class ConnectThread extends Thread{
private BluetoothSocket mSocket;
ConnectThread(BluetoothSocket s) throws IOException {
mSocket = s;
}
@Override
public synchronized void run(){
// 关闭发现设备
bluetoothAdapter.cancelDiscovery();
if(mSocket.isConnected()) return;
try{
log("try to connect device");
connecting = true;
mSocket.connect();
log("connected: " + mSocket.isConnected());
log("remote device name: " + mSocket.getRemoteDevice().getName());
log("remote device address: " + mSocket.getRemoteDevice().getAddress());
isConnected = true;
}catch(IOException connectException){
connectException.printStackTrace();
}finally {
connecting = false;
}
}
}
class ResponseHandlerThread extends Thread{
InputStream input;
ResponseHandlerThread(InputStream in){
input = in;
}
@Override
public synchronized void run(){
log("try to get response:");
DoipMessage resp = getDoipMessageFromInputStream(input);
if(resp == null){
Log.e(TAG,"Unable to get response message");
return;
}
if(waitingCallback.get(resp.requestID) == null){
Log.e(TAG,"Unable to get callback");
return;
}
waitingCallback.get(resp.requestID).onResult(resp);
waitingCallback.remove(resp.requestID);
}
}
private DoipMessage getDoipMessageFromInputStream(InputStream in){
DoipMessage message = null;
try {
byte[] envHeader = getBytesFromInputStream(in,24);
int requestId = (int) ((envHeader[8+3] & 0xFF)
| ((envHeader[8+2] & 0xFF)<<8)
| ((envHeader[8+1] & 0xFF)<<16)
| ((envHeader[8] & 0xFF)<<24));
log("requestID: " + requestId);
int contentLength = (int) ((envHeader[23] & 0xFF)
| ((envHeader[22] & 0xFF)<<8)
| ((envHeader[21] & 0xFF)<<16)
| ((envHeader[20] & 0xFF)<<24));
log("content Length: " + contentLength);
byte[] envContent = getBytesFromInputStream(in, contentLength);
return codec.BytesToMessage(envContent,requestId,null);
} catch (Exception e) {
e.printStackTrace();
}
return message;
}
private byte[] getBytesFromInputStream(InputStream in, int len) throws IOException {
int count = 0;
while(in.available() < len && count <100){
count ++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
byte[] bytes = new byte[len];
in.read(bytes);
return bytes;
}
}
项目中提供了IRPClient和DOIPClient两个接口类以提供IRP和DOIP客户端的抽象接口。
通过DOIPClient可以进行DO创建、修改、获取等操作,通过构造特定的DoipMessage和Repository以及Registry进行交互;主要是构造相应的DoipMessage通过client进行信息传递和接受
创建的DO主要由属性(attributes)、元信息(metadata)以及元素(element)组成
如果attributes字段中不包含metadata信息,则在创建DO的时候不会自动创建元信息
element支持字符串、API以及文件类型,对应的type为string/api/file
{
"id": "创建的时候为空",
"type": "0.TYPE/DO",
"attributes": {
"desc": "属性信息",
"metadata": {
"desc": "元数据相关信息"
}
},
"elements": [
{
"id": "DO内部唯一的ID",
"length": "元素长度",
"type": "string",
"dataString": "元素data字节数组的字符串形式",
"attributes": {
"desc": "元素属性"
},
"data": []
}
]
}
当element的type为api时,element的attributes属性需包含API地址url、请求方法method,当api属性设置为true时,retrieve操纵会返回API的返回值,否则则不返回API的返回值
当element的type为file时,element的attributes属性可以包括file,表示通过http接口获取文件的真实内容作为element的data信息
DoipMessage msg =
new DoipMessageFactory.DoipMessageBuilder()
.createRequest(doipInfo.id, BasicOperations.Create.getName())
.setBody(digitalObject)
.create();
//设置用户标识
msg.credential = new MessageCredential(userID);
DoipMessage msg =
new DoipMessageFactory.DoipMessageBuilder()
.createRequest(doID,BasicOperations.Retrieve.getName())
.create();
如果需要返回DO中的Element信息,需要配置
msg.header.parameters.addAttribute("includeElementData", true);
如果只访问指定的Element信息,则需要设置访问的Element的id
msg.header.parameters.addAttribute("element", elementID);