likes
comments
collection
share

Apache Camel的消息模型

作者站长头像
站长
· 阅读数 3

正如官网介绍:

Camel是一个可以让你非常容易集成各种系统进行消费使用或者生产数据的开源的集成框架。

那么它是怎么进行消费或者生产数据的呢?这里就要提到:Camel的消息模型。

Camel uses two abstractions for modeling messages, both of which we cover in this section:

  • org.apache.camel.Message—包含在 Camel 中传输和路由Route的数据的基本实体。
  • org.apache.camel.Exchange—用于交换消息的 Camel 抽象接口。此消息交换具有in输入消息,作为reply回复,out输出消息。

我们将从查看Message开始,以便了解数据在 Camel 中的模型和传输方式。在Route过程中,Message包含在Exchange中,后面我们介绍Exchange。

Message

Messages are the entities used by systems to communicate with each other when using messaging channels. Messages flow in one direction, from a sender to a receiver。

Apache Camel的消息模型

Messages 含有

  1. body (a payload)
  2. headers
  3. 可选 attachments

Apache Camel的消息模型

Message使用`java.lang.String`类型的标识符进行唯一标识。标识符的唯一性由消息创建者强制执行和保证,它依赖于协议,并且没有保证的格式。对于未定义唯一消息标识方案的协议,Camel 使用自己的 ID 生成器。

Headers

Headers 是与Message关联的值,例如sender(消息发送方)标识符,有关编码的提示,身份验证信息等。标头是 name-value对形式。name是唯一的不区分大小写的字符串,value类型为java.lang.Object对象。Camel对Headers的类型没有限制(Object),对其的大小或Message中包含的Headers数量也没有限制。Header以Map形式存储在消息中。

Attachments

Message也提供可选的Attachments(附件),这些Attachments通常用于Web服务和电子邮件组件中。

Body

Body的类型为java.lang.Object,作为Object对象因此可以存储任何类型的内容并且Camel也不限制其大小。作为程序的开发设计人员我们有责任确保接收方能理解Message的内容Body。当发送发和接收方使用不同的Body格式时,Camel提供了数据类型转换,我们后面会介绍到。

Fault flag

Message同时也具有Fault flag 错误标志。在一些协议和规范(如SOAP Web服务)区分output 和fault Messages。它们都是对调用操作的有效响应,但是后者表示结果不成功。通常Camel是不会处理faults,它们是客户端和服务器之间协定的一部分,应该是在应用级别中处理。(开发人员在代码中处理)

Exchange

Camel中的Exchange是路由过程中的Message container 消息容器,即Message是包含在Exchange中的。Exchange同时也为系统之间的各种类型交互提供支持,称为 Message Exchange PatternsMEP消息交换模式。

MEP用于区分两者模式

  • one-way 单向
  • request-response 请求响应

Camel的Exchange有一个属性MEP,该值两种选项:

  • InOnly—A one-way message。单向消息(也称为Event Message事件消息)。例如,JMS消息传递通常是单向消息传递。
  • InOut—A request-response message.请求-响应消息。例如,基于http的传输通常是请求-响应:客户端提交一个web请求,等待来自服务器的应答

Apache Camel的消息模型

Exchange ID

标识Exchange的唯一ID。Camel将自动为Exchange生成唯一的ID。

MEP

表示您是使用InOnly还是InOut消息传递样式的模式。当模式为:

  • InOnly时,交换包含一条in消息。
  • 对于InOut,还存在一条out消息,其中包含Caller调用方的应答消息。
  • InOptionalOut 模式下,处理器可以根据输入消息的内容和条件来决定是否生成输出消息。这种模式通常用于处理器可能会生成输出消息,但不是必须的情况。处理器可以根据业务逻辑来决定是否生成输出消息,以满足特定的需求。 总之, InOptionalOut 表示处理器可以选择性地生成输出消息,具有一定的灵活性和可选性。(新特性)

Exception

如果在路由过程中随时发生错误,将在异常字段中设置Exception异常

Properties

与Message Headers类似,但它们在整个交换过程中持续存在。属性用于包含全局级信息,而消息头则特定于特定消息。Camel本身在路由过程中向交换器添加了各种属性。作为开发人员,您可以在交换的生命周期内的任何时候存储和检索属性。

In message

这是input message 输入消息,这是必需的。in消息包含request请求消息。

Out message

这是一条可选消息,仅在MEP为InOut,或InOptionalOut满足条件时才存在。out消息包含response应答消息。

第一个Camel程序回顾

我们通过Processor的process方法访问了Exchange对象,我们修改一下代码。来看看用到了上面的那些属性。

package com.kylin;

import org.apache.camel.CamelContext;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

import java.util.Map;

public class FileCopierWithCamel_MessageModel {

    public static void main(String args[]) throws Exception {
        // create CamelContext
        CamelContext context = new DefaultCamelContext();

        // add our route to the CamelContext
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("file:C:/Users/kylin/Desktop/test?noop=true")
                        .process(exchange -> {
                            //properties
                            System.out.println("Exchange-properties");
                            Map<String, Object> properties = exchange.getProperties();
                            properties.forEach((k,v)->{
                                System.out.println(k+":"+v);
                            });

                            
                            //headers
                            System.out.println("In Message-headers");
                            Message inMessage = exchange.getIn();
                            Map<String, Object> headers = inMessage.getHeaders();
                            headers.forEach((k,v)->{
                                System.out.println(k+":"+v);
                            });

                            //body
                            System.out.println("In Message-body");
                            Object body = inMessage.getBody();
                            System.out.println(body);


                        })
                        .to("file:C:/Users/kylin/Desktop/copy");
            }
        });

        // start the route and let it do its work
        context.start();

        Thread.sleep(10_000);

        // stop the CamelContext
        context.stop();

    }

}

输出结果

Exchange-properties
CamelFileExchangeFile:GenericFile[C:\Users\kylin\Desktop\test\pwd.txt]
In Message-headers
CamelFileAbsolute:true
CamelFileAbsolutePath:C:\Users\kylin\Desktop\test\pwd.txt
CamelFileInitialOffset:org.apache.camel.support.resume.Resumables$AnonymousResumable@2f451841
CamelFileLastModified:1690891490736
CamelFileLength:881894
CamelFileName:pwd.txt
CamelFileNameConsumed:pwd.txt
CamelFileNameOnly:pwd.txt
CamelFileParent:C:\Users\kylin\Desktop\test
CamelFilePath:C:\Users\kylin\Desktop\test\pwd.txt
CamelFileRelativePath:pwd.txt
CamelMessageTimestamp:1690891490736
In Message-body
GeneriGencFile[C:\Users\kylin\Desktop\test\pwd.txt]

Message的Attachments通常用于Web服务和电子邮件组件中,我们这个File组件中并没有。而Fault flag同样用于例如SOAP Web服务组件中。

Out message只有MEP为InOut是才存在。我们可以通过:exchange.setPattern();设置,值为:

  1. ExchangePattern.InOnly
  2. ExchangePattern. InOut
  3. ExchangePattern.InOptionalOut

其中Message Body就是我们创建这个路由下指定路径下的文件,类型为Camel创建的GeneriGencFile对象。

而打印出来的Message headers(我们没有向里面添加header),则是File Component 添加的headers。我们可以在对应的Component文档中查询对应的含义。

Apache Camel的消息模型

NameDescriptionDefaultType
CamelFileLength (consumer)Constant: FILE_LENGTHA long value containing the file size.long
CamelFileLastModified (consumer)Constant: FILE_LAST_MODIFIEDA Long value containing the last modified timestamp of the file.long
CamelFileLocalWorkPath (producer)Constant: FILE_LOCAL_WORK_PATHThe local work path.File
CamelFileNameOnly (common)Constant: FILE_NAME_ONLYOnly the file name (the name with no leading paths).String
CamelFileName (common)Constant: FILE_NAME(producer) Specifies the name of the file to write (relative to the endpoint directory). This name can be a String; a String with a xref:languages:file-language.adocFile Language or xref:languages:simple-language.adocSimple expression; or an Expression object. If it’s null then Camel will auto-generate a filename based on the message unique ID. (consumer) Name of the consumed file as a relative file path with offset from the starting directory configured on the endpoint.String
CamelFileNameConsumed (consumer)Constant: FILE_NAME_CONSUMEDThe name of the file that has been consumed.String
CamelFileAbsolute (consumer)Constant: FILE_ABSOLUTEA boolean option specifying whether the consumed file denotes an absolute path or not. Should normally be false for relative paths. Absolute paths should normally not be used but we added to the move option to allow moving files to absolute paths. But can be used elsewhere as well.Boolean
CamelFileAbsolutePath (consumer)Constant: FILE_ABSOLUTE_PATHThe absolute path to the file. For relative files this path holds the relative path instead.String
CamelFileExtendedAttributes (consumer)Constant: FILE_EXTENDED_ATTRIBUTESThe extended attributes of the file.Map
CamelFileContentType (consumer)Constant: FILE_CONTENT_TYPEThe content type of the file.String
CamelFilePath (consumer)Constant: FILE_PATHThe file path. For relative files this is the starting directory the relative filename. For absolute files this is the absolute path.String
CamelFileRelativePath (consumer)Constant: FILE_RELATIVE_PATHThe relative path.String
CamelFileParent (common)Constant: FILE_PARENTThe parent path.String
CamelFileNameProduced (producer)Constant: FILE_NAME_PRODUCEDThe actual absolute filepath (path name) for the output file that was written. This header is set by Camel and its purpose is providing end-users with the name of the file that was written.String
CamelOverruleFileName (producer)Constant: OVERRULE_FILE_NAMEIs used for overruling CamelFileName header and use the value instead (but only once, as the producer will remove this header after writing the file). The value can be only be a String. Notice that if the option fileName has been configured, then this is still being evaluated.Object
CamelFileInitialOffset (consumer)Constant: INITIAL_OFFSETA long value containing the initial offset.long

后面我们也会详细的介绍如何通过Camel官网使用某个组件。怎么通过官方文档,来快速对这个组件的使用有一个认知,帮助我们快速上手使用所需要的组件~