《业务数据集成.docx》由会员分享,可在线阅读,更多相关《业务数据集成.docx(32页珍藏版)》请在三一办公上搜索。
1、1.1.1. 功能概述业务数据集成是指MDM Server与外围系统之间的数据交换时需要中间处理的模块应用层。如图所示,外围系统主要通过三种模式与MDM Server交互(这里不考虑ETL):l 直接的web service访问 主要通过http协议联机访问,一般不通过业务集成层而直接访问数据服务层l 近实时的消息通知 主要通过MQ消息访问,需要在业务集成层进行必要的处理l 批量处理 主要通过文件进行数据传递,需要在业务集成层进行处理业务集成服务层的主要内容包括:l 近实时的消息通知n 数据标准代码的转换n 交易xml的拆分n 其他规则l 批量处理n 标准代码的转换n 入库方式判断处理,判断是
2、否是新增还是更改n 交易拆分n 其他规则1.1.2. 联机数据集成(基于MQ/XML)联机数据集成主要处理近实时消息通知,处理模式如下图所示。MQ组件是所有的客户信息更新消息的载体,MsgReceiver组件负责接收MQ的消息,通过MsgParser组件进行xml格式解析,MsgReceiver转交到IntegrateFW组件,IntegrateFW组件是转发框架,负责业务集成层的基本控制,根据报文的类型调用必要的规则处理进行数据转换或者信息拆分,然后交由具体逻辑处理实现模块IntegrateImpl完成业务集成逻辑,并最终通过ServiceInvocation模块调用MDM的服务,完成交易。
3、数据服务层主要包括扩展服务和组合服务。如上图,根据业务需求,一般性的顺序如下详述:1 ejbCreate MQ中的信息通知WAS容器,根据MsgReceiver组件的MDB情况,创建一个实例进入池态;2 onMessage WAS容器调用MDB的onMessage方法,并把消息作为参数传递到该方法;2.1 deliver onMessage方法中调用IntegrateFW组件的deliver方法,并传递消息;2.1.1 parse IntegrateFW组件调用MsgParser组件解析传入的消息体;2.1.1.1 Constructor MsgParser组件根据消息体解析,生成一个IBOb
4、j对象;2.1.2 parse parse方法返回生成的IBObj对象;2.1.3 getTCRMTxType 获取IBObj对象的服务类型;2.1.4 getTCRMTxType 返回字符串结果;2.1.5 getRequestName 获取IBObj的请求者;2.1.6 getRequestName 返回;2.1.7 Constructor IntegrateFW根据服务对象和请求者,实例化一个IntegrateImpl组件中集成服务实例;2.1.8 execute 调用具体实例的业务逻辑,并把IBObj作为参数传递;2.1.8.1 callRule 调用标准代码转换规则,进行必要规则处理
5、;2.1.8.2 callRule 返回规则处理后的IBObj对象;2.1.8.3 matchRule 调用客户识别规则,进行识别2.1.8.4 matchRule 调用返回2.1.8.5 constructIBObj 在具体的业务逻辑处理中,根据具体需要调用MsgParser创建需要的IBObj对象;2.1.8.5.1 Constructor MsgParser构建新的IBObj实例;2.1.8.6 constructIBObj 返回具体的IBObj实例;2.1.8.7 setAttribute 处理具体的IBObj实例内容;2.1.8.8 setAttribute 处理内容返回;2.1.8
6、.9 toServiceXml 处理完所有的IBObj实例内容后,调用方法转换为xml标准服务格式;2.1.8.10 toServiceXml 返回xml字节流;2.1.8.11 invoke 调用MDM服务;2.1.8.12 invoke 返回;2.1.9 execute 完成具体业务集成逻辑实例的执行;2.2 deliver 完成整个业务集成逻辑的执行;注:2.1.8到2.1.9之间可能有多次的callRule和invoke调用;2.1.8 到2.1.9 execute的调用需要包括在事务处理中;批量处理的业务集成层设计参见批量型章节。1.1.2.1. MsgReceiver组件MsgRe
7、ceiver组件主要由IntegrateMDB组成,IntegrateMDB是一个MessageDriverBean,通过onMessage(Message)方法接收MQ的消息通知。信息接收组件主要负责从MQ接收消息,参数Message使用BytesMessage类型,消息格式为XML形式,其具体的接口定义参见服务接口定义中的相关通过MQ接口部分。接口主要包括:l 个险 增加客户(addPerson / CSCMQ)l 个险 保全修改客户(updatePerson / CSCMQ)l 电商网上销售 增加客户(addPerson / TOLMQ)l 电商网上销售 修改客户基本信息(updateP
8、ersonBase / TOLMQ)l 电商网上销售 保全修改客户(updatePerson / TOLMQ)注:以上括号中的服务名是指各个业务系统提交到MQ中的消息服务名,而不是MDM调用的标准服务名;组件调用IntegrateFW组件进行后续处理。public void onMessage(Message msg) if (msg instanceof BytesMessage) BytesMessage bm = (BytesMessage) msg; long length = bm.getBodyLength(); byte bs = new bytelength; bm.getBy
9、tes(bs); BytesInputStream bis = new BytesInputStream(bs); IntegrateFW.deliver(bis); else 1.1.2.2. IntegrateFW组件IntegrateFW组件是框架组件,负责业务集成层的控制调度。组件负责调用MsgParser组件,解析xml报文,然后根据报文类型调用具体的控制处理逻辑,同时调用业务处理规则进行数据和业务处理,最后形成处理后的报文提交ServiceInvocation组件调用MDM Service,完成整个处理,同时如果在处理中发生异常,则交由IntegException组件处理异常结果。
10、IntegrateFW组件中对于多个service调用作为同一事务来管理,如果调用出错需要进行回滚处理。IntegrateFW组件暴露一个接口IntegrateLogic供IntegrateImpl组件实现,其接口方法如下。/* * 执行具体的业务集成任务. 用于MQ异步通知模式 * 在IntegrateImpl组件实现该方法,处理具体某一项接口的逻辑调用,一般包括 * 代码转换、服务拆分、服务变换、DSP判断等 * param is 输入流,是通过MQ接收到的消息内容,xml字节流 * throws ItegrateException 例外,如果发生例外需要到例外组件处理 */public
11、void execute(IBobj obj) throws ItegrateException;组件对外调用的Faade接口调用是IntegratFW类,使用静态方法。/* * 传递到IntegrateFW组件执行后续任务. 用于MQ异步通知模式 * param is 输入流,是通过MQ接收到的消息内容,xml字节流 */public static void deliver(InputStream is);1.1.2.3. IntegrateImpl组件IntegrateImpl组件是根据报文类型确定的具体的业务集成逻辑的实现。一期需求主要包括:l 个险增加客户l 个险修改客户基本信息l 个
12、险保全修改客户信息l 网上销售增加客户参见个险增加客户,只是其接口内容稍有差异l 网上销售修改客户基本信息参见个险修改客户基本信息,只是其接口内容稍有差异l 网上销售保全修改客户信息参见个险保全修改客户信息,只是其接口内容稍有差异此组件需要处理地址、电话、电子邮件、客户标识等的格式变换,是把输入的对象格式变为标准的MDM扩展服务格式,包括:l 从粗粒度服务格式转换为updatePersonName服务格式l 转换为changePartyAddress服务格式 服务中确定具体调用correctPartyAddress还是addPartyAddress还是不做处理;l 转换为changeParty
13、ContactMethod服务格式 服务中定具体调用updatePartyContactMethod还是addPartyContactMethod还是不做处理;l 从粗粒度服务格式转换为changePartyIdentifer服务格式 服务中定具体调用updatePartyIdentifer还是addPartyIdentifer还是不做处理;具体接口参见服务接口和MDM开发文档。服务逻辑参见相关在线服务组合服务设计;1.1.2.4. MsgParser组件信息解析主要是根据xml报文解析其内容。MsgParser组件根据不同的报文类型解析不同的内容。MsgParser组件还负责输出变更后的报文
14、结果。MsgParser是暴露在外的Faade调用界面,通过两个个方法接收外部调用。/* * 解析xml. * param is 输入流,是通过MQ接收到的消息内容,xml字节流 */public static IBObj parse(InputStream is);/* * 把IBObj根据类型输出为标准的MDM service报文. * param obj 输入的IBObj * return 返回一个字节数组,组成xml,使用标准的iso-8859-1格式 */public static byte toServiceXml(IBObj obj);/* * 根据名称构建新的IBObj对象.
15、* param name 输入的IBObj名称 * return 返回IBObj对象 */public static IBObj constructIBObj(String name);IBObj是用于定义报文的bean,类似MDM中的BObj。IBObj主要定义xml中的bobj对象,IBObj内部可以嵌套。其暴露的接口如下。/* * 设置IBObj的类型. 是指对象类型,如TCRMPersonBObj、* TCRMAdminContEquivBObj、TCRMPartyAddressBObj、TCRMAddressBObj等。*/public void setType(String nam
16、e);public String getType();/* 设置服务的类型. */public void setTCRMTxType(String type);pubic String getTCRMTxType();/* 设置服务的对象类型. */public void setTCRMTxObject(String obj);public String getTCRMTxObject();/* 设置IBObj对象. */public void setIBObj(IBObj obj);/* 根据名字获取IBObj对象. 只能获取下一级对象.*/public IBObj getIBObj(Str
17、ing type);public String getAttribute(String type);/* 设置属性. 自动区分处理Extension属性.*/public String setAttribute(String type, String value);/* 设置服务头. */public void setHeader(String header);public String getHeader();/* 设置请求控制部分. */public void setRequestControl(String rc);public String getRequestControl();/*
18、 设置请求控制部分的请求名称. */public void setRequestName(String rn);public String getRequestName();/* 设置请求控制部分的LOB. */public void setRequestLOB(String rlob);public String getRequestLOB();1.1.2.5. DSP Rule组件处理DSP规则,具体参见DSP设计中在线可疑客户识别。1.1.2.6. CD Rule组件处理标准代码的转换,根据requestName来判断来源,并把源系统标准代码转换为MDM标准代码。CDRule组件由CDP
19、ool从数据库中装载标准代码数据映射。映射关系是各个源系统指向MDM。sources.properites文件是每个源系统接口中的需要转换的代码定义,其格式如下:CSCMQ= GenderType|HighestEducationType|TCRMAdminContEquivBObj. AdminSystemType|TOLMQ= GenderType|格式以Key=Value方式存放,key为源系统提交的requestName,而值是以|分割的多个数据域,每个数据域都需要进行标准代码替换。如果该数据域是在接口的更底层,以.作为路径分割,如TCRMAdminContEquivBObj.Admi
20、nSystemType表示是在该对象中的TCRMAdminContEquivBObj对象下的AdminSystemType需要进行代码转换。具体的实现类CDRule则根据输入IBObj对象和相应数据定义进行代码转换任务。RuleFW是对外暴露的调用类,提供静态方法。/* * 规则调用处理. * param type 规则类型,根据此类型确定是调用哪个规则实现 * param obj 输入输出对象,规则实现对其进行处理,并形成结果返回 */public static void callRule(Stirng type, IBObj obj) throws IntegrateException;一
21、期需要进行的代码转换为:等待标准代码标对应完毕后补充l 个险数据域源系统代码源系统值MDM代码MDM值GenderTypeM男M男F女F女U未知U未知TCRMPartyIdentificationBObj.IdentificationType0身份证00其它1参字第10后字第11空文字第12北文字13护照14装字第15北文字第RelationshipType1本人2丈夫3妻子4父亲5母亲6儿子HighestEducationType01文盲02小学03初中04高中05大专06本科07研究生及以上08中专未知MaritalStatusType0未婚0未婚1已婚1已婚2离婚2离婚3鳏寡3鳏寡Add
22、ressUsageTypeB单位地址2单位地址P邮递地址3邮递地址R家庭地址1家庭地址l 电商网上销售l 银保l 养老金l 团险1.1.2.7. Utils组件Utils组件是工具类组件,主要包括l 服务调用组件,负责进行MDM服务的调用通过IIOP方式访问MDM的EJB(DWLServiceController)来处理web service格式的请求,具体请参考MDM workbench中的com.ibm.mdm.training.testerDWLServiceControllerTester。l 数据库访问1.1.2.8. IntegException组件例外处理的组件,如果处理过程中有
23、例外,则需要记录例外的原因和状态,并把该服务请求xml保存到数据库。例外后需要保存的内容如下:字段代码类型备注错误流水号ERRLOGIDBigIntPK,自增型时间LOG_DTTimestamp错误返回消息ERRMESSAGEVarchar(255)exception.getMessage()或者业务逻辑错误说明,如找不到该客户等错误提交消息REQUESTXMLXml字段错误堆栈同样需要记录到log4j日志中。1.1.3. 批量数据集成(基于批量/XML)批量处理的主要内容类似业务数据集成章节的近实时处理部分,主要是FileParser模块管理各个业务系统上传的批量文件,并调用MsgParse
24、r模块解析具体的数据内容,同样,根据规则处理各个业务逻辑规则,然后形成具体的批量文件,并调用MDM的BatchProcessor进行批量处理。具体模块关系如下图所示。批量处理的部分将会复用较多的近实时处理部分的业务组件,包括:l MsgParser组件l IntegException组件l CDRule组件1 detectFile 守护线程,检查各个系统相应目录下的上穿文件是否ready;2 Constructor 读入文件,形成文件输入流和输出流,并调用批量处理,根据源系统类型实例化具体的业务逻辑处理实例,传递输入流;3 readOneService 从输入流读取一个service块;4 r
25、eadOneService 返回读取的service块;5 parse 调用MsgParser把servie块的xml格式解析成为一个IBObj;6 parse 返回IBObj;7 callRule 调用代码转换rule,转换客户证件类型、地址类型、联系类型为MDM标准代码;8 callRule 调用返回;9 writeTmp 把客户关键信息写入数据库,内容参见临时表定义;10 writeTmp 返回;以上从3到10循环处理,直到文件输入流到尾端;11 resetFile 把文件输入流重新定位到开始;12 resetFile 返回重新定位后的文件输入流;13 callRule 调用批量客户识别
26、的规则处理;13.1 updateTmp 根据规则处理结果更新临时表,确定客户增加、修改类型以及客户的地址、联系、证件类型更改类型;13.2 updateTmp 处理临时表结束;14 callRule 调用规则返回;15 readOneService 从文件输入流读入一个service xml块;16 readOneService 返回service块;17 parse 调用MsgParser解析为一个IBObj;18 parse 返回IBObj;19 readTmp 读入一条与IBObj匹配的临时表记录;20 readTmp 读入返回数据;21 callRule 调用格式转换规则处理,根据临
27、时表类型标志,转换IBObj的内容为标准服务的IBObj;22 callRule 返回转换完毕的IBObj;23 toServiceXml 把IBObj转换为标准的xml格式;24 toServiceXml xml结果;25 writeXml2File 把xml结果写入文件输出流;26 writeXml2File 返回;以上15到26循环处理,直到文件处理完毕;27 runbatch.sh 关闭相应资源,调用MDM的BatchProcessor进行批量处理,完成后处理相应输入输出及中间文件;1.1.1.1. 批量处理接口批量处理主要包括三个外部系统接口:l 银保增加客户l 养老金增加客户l 团
28、险增加客户具体接口格式参见服务接口外部系统输出符合接口规范的数据文件,并FTP到规定的目录,具体参见服务接口中“批量接口模式”。1.1.1.2. FileParser组件FileParser组件是解析文件的处理,其主要处理客户的上传数据文件,同时新建输出结果文件。解析整体文件格式;包括输入输出,输出文件每个service xml只能是一行;FileParser调用IntegrateFW组件以处理不同的业务集成逻辑。FileParser组件暴露一个抽象类IntegrateBatchLogic供IntegrateImpl组件实现,其抽象方法如下。/* * 执行具体的业务集成任务. 用于批量模式 *
29、 在IntegrateImpl组件实现该方法,处理具体某一项接口的逻辑调用,一般包括 * 代码转换、服务拆分、服务变换、DSP判断等 * param is 输入流,是通过MQ接收到的消息内容,xml字节流 * throws ItegrateException 例外,如果发生例外需要到例外组件处理 */public abstract void executeBatch(InputStream is,OutputStream os) throws ItegrateException;在IntegrateBatchLogic中还实现必要方法: resetFile 重置文件指针到文件头; callBa
30、tchSDP 调用批量客户识别规则; callRule 调用代码转换规则; getService 获取文件的下一个service块; callMDMBatch 调用MDM的runbatch.sh1.1.1.3. IntegrateImpl组件具体的业务集成逻辑实现,三个不同源系统的增加客户业务集成逻辑相同,其具体内容格式有所差异。具体逻辑处理如下图。1.1.1.4. 批量客户识别匹配规则批量客户识别匹配规则是指考虑执行效率而进行的客户识别匹配程序,其匹配规则与在线完全相同,但处理方式变成批量,规则参考如下:统一客户管理平台进行客户识别的关键数据包括:客户名称+出生日期、证件类型+证件编号以及客
31、户性别。为了提高系统的客户识别能力和信息的准确度,加快业务处理的速度,系统针对用于客户识别的关键数据进行了有效性规定,符合下列规定的信息才可以作为客户识别的依据:n 客户名称不允许为空n 证件编号长度至少8位。不满足上述有效性规则的客户信息会被加入统一客户管理平台,但不会进行客户识别。注:规则以在线匹配规则为准,请参考相关设计文档以确认规则一致性;客户匹配的主体是MDM数据库的contact / person / identifier表;另外进行匹配的表是临时表:tmp_contactSql参考:- 先根据客户号和源系统id进行匹配update tmp_contact t set (MATCH
32、_TP,MATCH_PARTYID,MATCH_PARTYUPDT,MATCH_PERSONUPDT,PERSONNAMEID,PERSONNAME_UPDT) = (select 1,c.cont_id ,c.last_update_dt,p.last_update_dt,pn.person_name_id,pn.last_update_dtfrom contact c,person p,tmp_contact t,contequiv e ,personname pnwhere c.cont_id=p.cont_idand t.lob_tp=e.ADMIN_SYS_TP_CD and t.l
33、ob_custno=e.ADMIN_CLIENT_ID and e.cont_id=p.cont_idand pn.cont_id=c.cont_idand pn.name_usage_tp_cd=1and (pn.end_dt is null or pn.end_dt current timestamp)and (c.end_dt is null or c.end_dt current timestamp)and (p.end_dt is null or p.end_dt current timestamp)and (e.end_dt is null or e.end_dt current
34、timestamp)where exists (select 1from contact c,person p,tmp_contact t,contequiv e ,personname pnwhere c.cont_id=p.cont_idand t.lob_tp=e.ADMIN_SYS_TP_CD and t.lob_custno=e.ADMIN_CLIENT_ID and e.cont_id=p.cont_idand pn.cont_id=c.cont_idand pn.name_usage_tp_cd=1and (pn.end_dt is null or pn.end_dt curre
35、nt timestamp)and (c.end_dt is null or c.end_dt current timestamp)and (p.end_dt is null or p.end_dt current timestamp)and (e.end_dt is null or e.end_dt current timestamp);- 根据证件类型证件号码客户姓名客户生日进行匹配update tmp_contact t set (MATCH_TP,MATCH_PARTYID,MATCH_PARTYUPDT,MATCH_PERSONUPDT,PERSONNAMEID,PERSONNAME_
36、UPDT)= (select 1,c.cont_id ,c.last_update_dt,p.last_update_dt,pn.person_name_id,pn.last_update_dtfrom contact c,person p,tmp_contact t,identifier i,personname pnwhere c.cont_id=p.cont_id and i.cont_id=c.cont_id and t.id_tp_cd=i.id_tp_cd and t.ref_num=i.ref_num and length(rtrim(ltrim(t.ref_num)7 and
37、t.contact_name=c.contact_name and t.person_name is not null and t.person_name and (t.GENDER_TP_CODE is null and p. t.GENDER_TP_CODE is null) or t.GENDER_TP_CODE=p.GENDER_TP_CODE)and char(t.birth_dt, ISO)=substr(p.birth_dt, 1, 10) and t.match_tp=2and pn.cont_id=c.cont_idand pn.name_usage_tp_cd=1and (
38、pn.end_dt is null or pn.end_dt current timestamp)and (c.end_dt is null or c.end_dt current timestamp)and (p.end_dt is null or p.end_dt current timestamp)and (i.end_dt is null or i.end_dt current timestamp)where exists (select c.cont_id from contact c,person p,tmp_contact t,identifier i,personname pn
39、where c.cont_id=p.cont_id and i.cont_id=c.cont_id and t.id_tp_cd=i.id_tp_cd and t.ref_num=i.ref_num and length(rtrim(ltrim(t.ref_num)7 and t.contact_name=c.contact_name and t.person_name is not null and t.person_name and (t.GENDER_TP_CODE is null and p.GENDER_TP_CODE is null) or t.GENDER_TP_CODE=p.G
40、ENDER_TP_CODE)and char(t.birth_dt, ISO)=substr(p.birth_dt, 1, 10) and t.match_tp=2and pn.cont_id=c.cont_idand pn.name_usage_tp_cd=1and (pn.end_dt is null or pn.end_dt current timestamp)and (c.end_dt is null or c.end_dt current timestamp)and (p.end_dt is null or p.end_dt current timestamp)and (i.end_
41、dt is null or i.end_dt current timestamp);- 根据匹配结果查找需要修改的证件号码update tmp_contact t set (IDENT_MATCH_TP,IDENTIFIR_ID, IDENTIFIR_UPDT) = (select 1 ,IDENTIFIER_ID ,LAST_UPDATE_DTfrom tmp_contact t ,identifier iwhere t.match_partyid=i.cont_id and t.id_tp_cd=i.id_tp_cd and t.ref_numi.ref_num and (i.end_dt
42、 is null or i.end_dt current timestamp) where exists (select 1from tmp_contact t ,identifier iwhere t.match_partyid=i.cont_id and t.id_tp_cd=i.id_tp_cd and t.ref_numi.ref_num and (i.end_dt is null or i.end_dt current timestamp);- 根据匹配结果查找需要修改的地址update tmp_partyaddress set(ADDR_UP_TP, PARTYADDRESSID,
43、 PARTYADDRESSUPDT, LOCATIONGROUPUPDT) = (select 2 ,a.location_group_id,a.last_update_dt ,l.last_update_dtfrom locationgroup l ,adderssgroup a ,tmp_contact t ,tmp_partyaddress pawhere l.location_group_id=a.location_group_id and l.cont_id=t.matcd_partyidand t.lob_tp=pa.lob_tpand t.lob_custno =pa.lob_custnoand pa.ADDR_USAGE_TP_CD=a.ADDR_USAGE_TP_CDand (l.end_dt is null or l.end_dt current timestamp)and (a.end_dt is null or a.end_dt current timestamp)where exists(select 1from locationgr