Kafka - 连接分布式消息服务

Kafka是LinkedIn贡献给Apache的开源时时流处理平台(http://kafka.apache.org/)。

主要功能

  • 支持Kafka 2.5版本的流消息服务
  • 提供CCAPI封装,方便开发者使用
  • 提供Kafka Producer(向Kafka发送消息)的流程可编排服务(Service Task)
  • 支持CC部署环境变量
  • 支持SLA服务质量监控和告警通知,提供全局requestId
  • 提供访问日志开关

特别说明

  • 使用该应用要求AWS PaaS平台许可支持CC服务
  • 使用该应用要求AWS PaaS平台版本不低于6.4.1

1.配置

基本信息

配置Kafka适配器。

说明
名称 Kafka适配器的名称
服务地址 设置bootstrap.server,格式:host1:port1,host2:port2支持@公式,如@env(xxx)可获取环境中相应变量值
描述 适配器描述信息

扩展属性

配置Kafka发送、接收消息的类型。仅支持配置一种类型,如需处理多种类型消息,请创建多个Kafka连接器。

2.调用

由开发在Java代码中调用Kafka服务。

获取模型ID

在CC连接列表中可获取适配器模型ID。

API代码示例

在访问前,我们假设已创建了一个CC Kafka技术适配器,其分配的模型Id为00000000

import com.actionsoft.bpms.bpmn.engine.core.delegate.ProcessExecutionContext;
import com.actionsoft.bpms.bpmn.engine.listener.ExecuteListener;
import com.actionsoft.sdk.local.SDK;
import com.actionsoft.sdk.local.api.cc.KafkaAPI;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * 需要在开发环境中引入AWS PaaS平台以下目录下所有jar包:
 * bin/lib目录下
 * bin/jdbc目录下
 * apps/install/com.actionsoft.apps.cc.connector.kafka/lib目录下
 */
public class KafkaAPITest extends ExecuteListener {
    @Override
    public void execute(ProcessExecutionContext processExecutionContext) throws Exception {
         //获得一个操作KafkaAPI服务的getKafkaAPI对象
        KafkaAPI<String,String> kafkaAPI = SDK.getCCAPI().getKafkaAPI("00000000");

        //发送消息到指定topic
        kafkaAPI.sendTopic("test","heelo");
        //消息发送
        kafkaAPI.execute(new KafkaAPI.ConsumerCallback<String, String>() {

            @Override
            public void doInKafka(Consumer<String, String> consumer) {
                List<PartitionInfo> list = consumer.partitionsFor("test");
                List<TopicPartition> tps = new ArrayList<TopicPartition>();
                for (PartitionInfo pi : list) {
                    TopicPartition tp = new TopicPartition(pi.topic(), pi.partition());
                    tps.add(tp);
                    consumer.assign(Collections.singletonList(tp));
                    long l = consumer.position(tp);
                    System.out.println("position:" + tp + "," + l);
                }
                Map<TopicPartition, Long> boff = consumer.beginningOffsets(tps);
                Map<TopicPartition, Long> eoff = consumer.endOffsets(tps);
                System.out.println("Offsets:" + boff + "," + eoff);
            }
        });
        // 刷新相关
        kafkaAPI.autoFlush(true);//设为true,每次发送后刷新

        //........

        //关闭释放资源
        kafkaAPI.close();

    }
}

KafkaAPI JavaDOC https://docs.awspaas.com/api/aws-api-javadoc/com/actionsoft/sdk/local/api/cc/KafkaAPI.html

3. 流程编排服务

该应用提供Kafka Producer(向Kafka发送消息)的流程编排服务(Service Task)。进行连接服务发布页签可进行查看。

调用

操作步骤:

  1. 创建一个流程模型
  2. 拖动一个系统任务至画布
  3. 配置系统任务,服务类型为流程服务,选择该流程服务,配置相关信息
  4. 完成流程模型的正确设计,创建流程实例,到任务到达系统任务时,即可发送消息

4.日志

当连接器开启记录访问请求到审计日志记录返回结果到文件日志开关后,调用KafkaAPI相关方法时,可记录日志,具体哪些方法会记录日志,请参见javadoc KafkaAPI说明。

记录访问请求到审计日志

开启后,将访问请求记录到审计日志,该日志存储在AWS PaaS平台SYS_AUDIT_LOG表中,可在日志页进行查看。

记录返回结果到文件日志

开启后,将请求数据和返回结果记录到文件日志,该日志存储在AWS PaaS平台%AWS_HOME%/logs/目录下,默认每个文件日志最大为20M,最多存储20个文件,超出将自动清除最早日期文件。 log文件个数及单文件最大值可在%AWS_HOME%/bin/conf/aws-log4j.xml文件中配置。

5.监控

使用该功能要求AWS PaaS平台许可支持SLA服务质量监控。

开启SLA服务质量监控后,当调用RDSAPI时,将自动对调用次数、出错次数、执行耗时进行监控。 并可通过SLA告警监控策略对监控数据进行告警。

当配置了告警监控且触发后,可快速查看告警信息列表。

6.删除

在连接列表,光标移至需要删除的模型上,点击右侧删除按钮,按钮提示进行删除。

7.应用信息

显示当前模型所属应用信息及受管状态。在不同环境中,支持AWS PaaS的模型受管(Managed)控制。有关模型受管详细参见https://docs.awspaas.com/reference-guide/aws-paas-env-guide/managed/README.html

8.DevOps

显示当前模型的开发维护权限,可查看到具体人员信息。有关权限的设置参见https://docs.awspaas.com/user-manual/aws-pass-console-user-manual-amc/application_develop/README.html#a



Seven
 010-62962343-690
 liujx@actionsoft.com.cn
感谢您对该文档的关注!如果您对当前页面内容有疑问或好的建议,请与我联系。如果您需要解答相关技术问题请登录AWS客户成功社区