Kafka 实时数据订阅
简介
GrowingIO 为开放架构,实时采集的用户行为数据支持订阅以满足更多的使用场景,如基于用户事件触发的商品推荐、消息推送等。
能力边界和约束
- 通过jar依赖的形式,获取Kafka里的原始数据,不包含后续数据流中GrowingIO添加的字段
- 订阅的机器需要与GrowingIO私有化部署的机器在同一个内网,且可以解析GrowingIO服务器的host
能力说明
Topic 范围
用户行为数据
Topic: cdp-event-collect 行为事件,包含 访问,埋点,页面,点击事件
用户属性数据
Topic: cdp-user-props-collect 用户属性
维度表数据
Topic: cdp-item-collect 维度表
安装依赖
GrowingIO提供了集成 kafka 里面的protobuf二进制数据转换成数据模型字段的数据工具类,所在的jar包已提交到Maven中央仓库中,需要通过 Maven 给项目引入依赖的jar包。 项目引入依赖的jar包需要在pom.xml中添加如下配置信息:
<dependency>
<groupId>io.growing.data.utils.connector</groupId>
<artifactId>gio-data-connector</artifactId>
<version>1.0.4</version>
</dependency>
pom中引入依赖后,刷新Maven拉取jar包到本地
刷新Maven获取jar包时,若报错: Failed to read artifact descriptor for io.growing.data.utils.connector:gio-data-connector:jar:standalone:1.0.1
解决方案:在Maven缺省的本地仓库路径.m2下修改settings.xml文件,增加中央仓库地址的配置,内容参加如下:
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<offline>false</offline>
<servers>
</servers>
<mirrors>
<mirror>
<id>huaweicloud</id>
<mirrorOf>central</mirrorOf>
<url>https://repo.huaweicloud.com/repository/maven/</url>
</mirror>
</mirrors>
<profiles>
<profile>
<id>defaultProfile</id>
<repositories>
<repository>
<id>gio</id>
<name>local private nexus</name>
<url>https://nexus.growingio.cn/repository/maven-public</url>
<releases>
<enabled>true</enabled>
<checksumPolicy>warn</checksumPolicy>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>huawei</id>
<name>huawei</name>
<url>https://repo.huaweicloud.com/repository/maven/</url>
</repository>
<repository>
<id>flink snapshot</id>
<name>flink snapshot</name>
<url>https://repository.apache.org/content/repositories/snapshots</url>
</repository>
<repository>
<id>oss.sonatype.org-snapshot</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>alimaven</id>
<name>aliyun maven plugin repo</name>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
<activeProfiles>
<activeProfile>defaultProfile</activeProfile>
</activeProfiles>
</settings>
参数配置
Jar 内部提供通过 KafkaSource 自动消费kafka 中的数据,需先配置 gio-kafka.properties 放到 classpath 中。
# kafka server 地址和端口。多个地址用英文逗号分隔
bootstrap.servers={kafka地址1}:9092,{kafka地址2}:9092,{kafka地址3}:9092 ...
# 消费者组
group.id=sdk-demo
数据格式及转换工具
转换工具类
通过3个数据转换工具类,将 kafka 中的 protobuf 二进制数据转换成数据模型字段。
- 获取行为事件数据
EventViewDto eventView = new EventDtoTransformer().transform(protobufBytes)
- 获取用户属性数据
UserViewDto eventView = new UserDtoTransformer().transform(protobufBytes)
- 获取维度表数据
ItemViewDto itemView = new ItemDtoTransformer().transform(protobufBytes)
消费 kafka 数据示例
使用 KafkaSource 消费行为数据的示例,更多例子参考 io.growing.data.utils.connector.examples.ConsumeKafkaSource
import io.growing.data.utils.connector.dto.EventViewDto;
import io.growing.data.utils.connector.dto.ItemViewDto;
import io.growing.data.utils.connector.dto.UserViewDto;
import io.growing.data.utils.connector.source.DataSource;
import io.growing.data.utils.connector.source.KafkaSource;
import io.growing.data.utils.connector.transformer.Transformer;
import io.growing.data.utils.connector.transformer.EventDtoTransformer;
import io.growing.data.utils.connector.transformer.ItemDtoTransformer;
import io.growing.data.utils.connector.transformer.UserDtoTransformer;
public class Demo {
static DataSource source = new KafkaSource();
public static void main( String[] args ) {
consumeEventView();
}
public static void consumeEventView() {
Transformer<EventViewDto> event = new EventDtoTransformer(); //数据转换类
source.open(event.messageTypes()); //数据类型
source.consume(event, new OutputEventView());
}
}
public class OutputEventView implements OutputMessage<EventViewDto>{
@Override
public EventViewDto output(EventViewDto input) {
System.out.println("event_key: " + input.getEventKey());
return null;
}
}
数据字典
用户行为数据字典
Kafka字段 | 事件表字段(2.0版) | 含义 |
---|---|---|
String eventKey; | event_key | 事件标识符 |
long eventTime; | event_time | 事件接收时间(毫秒时间戳) |
long clientTime; | client_time | 事件发生时间(毫秒时间戳) |
String anonymousUser; | anonymous_user | 访问用户ID |
String userId; | user | 登录用户ID |
String userKey; | user_key | 用户身份类型 |
String session; | session | 会话标识,标记一个访问 |
Map<String, String> attributes; | attributes | 事件属性 |
String packageName; | package | APP包名/Web域名/小程序APPID |
String platform; | $platform | 平台标识,示例:Web |
String referrerDomain; | $referrer_domain | 来源域名或包名 |
String path; | $path | 页面路径 |
String title; | $title | 页面标题 |
String query; | $query | 页面query参数 |
String keyWord; | $key_word | 访问来源广告关键字 |
String accountId; | account_id | 系统账户ID,由SDK集成时设置 |
String domain; | $domain | 域名或包名 |
String ip; | $ip | 客户端ID地址 |
String userAgent; | $user_agent | 浏览器 agent 详细信息 |
String sdkVersion; | $sdk_version | SDK版本号 |
String dataSourceId; | $data_source_id | 数据源信息 |
如上attributes字段中预定义属性如下 | 字段 | 含义 | | --- | --- | | $xpath | 元素在页面中的位置 | | $text_value | 元素对应的文本名 | | $href | 元素对应的链接 | | $index | 元素在列表中的位置,0开始 |
用户属性数据字典
Kafka字段 | 含义 |
---|---|
String accountId; | 系统账户ID,由SDK集成时设置 |
String propKey; | 属性标识符 |
String propValue; | 属性值 |
long sendTime; | 属性接收时间 |
String userKey; | 用户身份 |
String userId; | 登录用户ID |
String anonymousId; | 匿名用户ID |
维度表数据字典
Kafka字段 | 含义 |
---|---|
String accountId; | 系统账户ID,由SDK集成时设置 |
String propKey; | 维度表字段标识符 |
String propValue; | 维度表字段值 |
String itemKey; | 维度表标识符 |
String itemValue; | 维度表记录ID |
常见问题
Q:支持订阅历史数据吗?
A:可以。gio-kafka.properties 配置文件中加入 auto.offset.reset=earliest,可以订阅 Kafka 中保存的历史数据。
Q:能指定数据源 ID 订阅 Kafka 数据吗?
A:不能指定数据源 ID 订阅,但可以在消费代码中按照数据源 ID 过滤,提取需要的数据。
Q:能按组订阅 Kafka 数据吗?
A:可以。gio-kafka.properties 配置文件中定义 group.id,区分不同的消费组。
Q:Kafka 订阅不到数据可能的原因
A:多半跟网络不通有关系,请确认通过 telnet hostname 9092 能访问 GrowingIO 部署的服务器。