Overview

Introduction

Inlong sort standalone is a module responsible for consuming the data stream reported by users from the cache layer and distributing it to different data stores. It supports hive, elasticsearch, CLS and other data stores.
Inlong sort standalone relies on inlong manager to manage system metadata. Inlong sort standalone is deployed by cluster and aggregates and distributes tasks by target storage.

Feature

Multi tenant system

Inlong sort standalone supports multi tenancy. An inlong sort standalone cluster can host the distribution tasks of different tenants. The distribution tasks are obtained from the inlong manager.
Each distribution task is responsible for distributing multiple data streams to a data store. Users only need to configure on the front page of inlong manager to specify the data streams to be distributed to a specific data store.
For example, the inlong data streams D1 and D2 are distributed to hive cluster H1, D1 is also distributed to elasticsearch cluster E1, and D2 is also distributed to CLS cluster C1. Then the inlong sort standalone cluster will receive three distribution tasks.

  • H1 distributes task consumption D1 and D2 to hive cluster H1;
  • E1 distribution task consumption D1, distributed to elasticsearch cluster E1;
  • C1 distributes the task consumption D2 and distributes it to CLS cluster C1.

Distribution tasks support dynamic updates

Inlong sort standalone supports dynamic updating of distribution tasks, such as the information of the data source where the inlong data stream is located, the data stream schema information, and the information of the target data store.
It should be noted that the new distribution of inlong data stream will be consumed from the latest location of the cache layer;
After the inlong data stream is distributed offline, it goes online again. If the consumption location when it goes offline is still within the life cycle of the cache layer, it continues to consume from the consumption location when it goes offline;
If the consumption location at the time of offline is no longer within the life cycle of the cache layer, consumption starts from the latest location of the cache layer.

message queues supported by the cache layer

  • Inlong-Tube
  • Apache Pulsar
  • Apache Kafka

supported data storage

  • Apache Hive (currently only supports sequence file format)
  • Apache Pulsar
  • Apache Kafka
  • Elasticsearch
  • ClickHouse
  • Tencent CLS

Future planning

support more kinds of data storage

HBase, etc.

support more file formats written to Apache hive

Orc file, etc.

Configuration in Sort-Standalone

The configuration in Sort-Standalone can be divided into three parts:

  • Basic node configuration: including the type of source, channel, sink this node try to specify, and how to acquire the metadata of each group id and stream id. These configs will not be updated since the process starts. They will be put in common.properties
  • Sink configuration: including the metadata of sinks, such as access point, username, password. And the metadata of each id which sinks require. These configs will be put in SortClusterConfig
  • Source configuration: including the metadata of sources, such as mq type, topic, serviceUrl. These configs will be put in SortSourceConfig

Configuration file:conf/common.properties

ParameterRequiredDefaultValueRemark
clusterIdYNAinlong-sort-standalone cluster id
sortSource.typeNorg.apache.inlong.sort.standalone.source.readapi.SortSdkSourceSource class name
sortChannel.typeNorg.apache.inlong.sort.standalone.channel.BufferQueueChannelChannel class name
sortSink.typeNorg.apache.inlong.sort.standalone.sink.hive.HiveSinkSink class name
sortClusterConfig.typeYmanagerClusterConfig configuration type. File config mode please choose: file
sortSourceConfig.QueryConsumeConfigTypeYmanagerSourceConfig configuration type. File config mode please choose: file
sortClusterConfig.fileNSortClusterConfig.confSort cluster config file name. Required in file config type.
managerUrlLoaderTypeNorg.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoaderThe type of manager url loader. Required in manager config type.
sortClusterConfig.managerUrlNhttp://${manager_ip}:{manager_port}/inlong/manager/openapi/sort/getClusterConfigThe manager config interface to cluster config. Required in manager config type.
sortSourceConfig.managerUrlNhttp://${manager_ip}:{manager_port}/inlong/manager/openapi/sort/getSortSourceThe manager config interface to sort source config. Required in manager config mode.
eventFormatHandlerNorg.apache.inlong.sort.standalone.sink.hive.DefaultEventFormatHandlerFormatter class name
maxThreadsN10sink thread number
reloadIntervalN60000interval updating Configuration data(millisecond)
processIntervalN100interval processing data(millisecond)
metricDomainsNSortdomain name of metric
metricDomains.Sort.domainListenersNorg.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListenerclass name list of metric listener, separated by space
prometheusHttpPortN8080HTTP server port of prometheus simple client
metricDomains.Sort.snapshotIntervalN60000interval snapshoting metric data(millisecond)
sortsdk.consumeStrategyNlatestThe consume strategy of topic, other option is earliest
interceptor.typeNorg.apache.inlong.sort.standalone.rollback.TimeBasedFilterInterceptor$BuilderType of interceptor used in sort-standalone. The default interceptor is based on message time.
rollback.startTimeN1970-01-01 08:00:00Valid message start time. Earlier message will be filtered.
rollback.stopTimeNNAValid message stop time. Later message will be filtered.

SortClusterConfig

The SortClusterConfig can be acquired from local file or manager, depending on the sortClusterConfig.type configured in common.properties.

ParameterRequiredDefaultValueRemark
clusterNameYNAinlong-sort-standalone cluster id
sortTasksYNASort task list

SortTask

Sort Task includes idParams and sinkParams which represent the config of stream and dataNode respectively

The idParams and sinkParams are totally different among each type of sort tasks. There are two examples of hive task and pulsar task

ParameterRequiredDefaultValueRemark
nameYNAsort task name
typeYNAsort task type, for example:HIVE(“hive”), TUBE(“tube”), KAFKA(“kafka”), PULSAR(“pulsar”), ElasticSearch(“ElasticSearch”), UNKNOWN(“n”)
idParamsYNAList of Inlong DataStream configuration
sinkParamsYNASort task parameters

idParams content of Hive sort task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNAseparator
partitionIntervalMsN3600000partition interval(millisecond)
idRootPathYNAHDFS root path of Inlong DataStream
partitionSubPathYNApartition sub path of Inlong DataStream
hiveTableNameYNAHive table name of Inlong DataStream
partitionFieldNameNdtpartition field name of Inlong DataStream
partitionFieldPatternYNADate format of partition field value, the type have {yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}
msgTimeFieldPatternYNADate format of message generation time, it support Java date format
maxPartitionOpenDelayHourN8Max delay hour of partition(hour)

sinkParams content of Hive sort task

ParameterRequiredDefaultValueRemark
hdfsPathYNANameNode URL of HDFS
maxFileOpenDelayMinuteN5Max writing delay minute of simple HDFS file(minute)
tokenOvertimeMinuteN60token overtime of Inlong Data Stream(minute)
maxOutputFileSizeGbN2Max file size of simple HDFS file(GB)
hiveJdbcUrlYNAJDBC URL of Hive
hiveDatabaseYNAHive database
hiveUsernameYNAHive username
hivePasswordYNAHive password

idParams content of Pulsar sort task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
topicYNATopic of MQ

sinkParams content of Pulsar sort task

ParameterRequiredDefaultValueRemark
serviceUrlYNAPulsar service URL
authenticationYNAPulsar authentication
enableBatchingNtrueenableBatching
batchingMaxBytesN5242880batchingMaxBytes
batchingMaxMessagesN3000batchingMaxMessages
batchingMaxPublishDelayN1batchingMaxPublishDelay
maxPendingMessagesN1000maxPendingMessages
maxPendingMessagesAcrossPartitionsN50000maxPendingMessagesAcrossPartitions
sendTimeoutN0sendTimeout
compressionTypeNNONEcompressionType
blockIfQueueFullNtrueblockIfQueueFull
roundRobinRouterBatchingPartitionSwitchFrequencyN10roundRobinRouterBatchingPartitionSwitchFrequency

SortSourceConfig

ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone cluster id
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NACache cluster list, Map<cacheClusterName, CacheCluster>

CacheZones

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNAcache cluster name
zoneTypeYStringNA[pulsar,tube,kafka]
serviceUrlYStringNAPulsar serviceUrl or Kafka broker list
authenticationYStringNAPulsar authentication
cacheZonePropertiesNMap<String,String>NACache consumer configuration
topicsNList<Topic>NATopic list of Cache consumer

Topics

ParameterRequiredTypeDefaultValueRemark
topicYStringNAcache topic name
partitionCntYIntegerNAcache topic partition count
topicPropertiesNMap<String,String>NACache topic configuration