1. Overview
Apache Kafka is a distributed event streaming platform capable of handling trillions of events a day. It is used for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
The GoInsight Kafka node allows you to interact directly with your Kafka clusters within your automation workflows. You can seamlessly produce messages and retrieve metadata about your topics and partitions. Key capabilities include:
- Producing Messages: Send records or messages to a specified topic.
- Topic Management: List all available topics or retrieve details for a specific topic.
- Partition Inspection: Get metadata for all partitions within a topic or for a single, specific partition.
2. Prerequisites
Before using this node, you need to have access to a running Apache Kafka cluster. You will also need the necessary connection details (such as broker addresses and any required authentication credentials) to connect to your cluster.
3. Credentials
For a detailed guide on how to obtain and configure credentials, please refer to our official documentation: Credentials Configuration Guide.
4. 支持的操作
摘要
此节点主要围绕消息 (Message)、分区 (Partition) 和主题 (Topic) 等资源进行操作。
| 资源 | 操作 | 描述 |
|---|---|---|
| 消息 | Create a Produce Message | 在 Apache Kafka 中按主题名称创建生产消息。 |
| 分区 | Get Many Partitions | 从 Apache Kafka 中按主题名称获取多个分区。 |
| 分区 | Get a Partition | 从 Apache Kafka 中按主题名称获取有关主题中单个分区的元数据。 |
| 主题 | Get Many Topics | 从 Kafka 集群获取所有主题。 |
| 主题 | Get a Topic | 从 Apache Kafka 中按主题名称获取主题。 |
操作详解
Create a Produce Message
在 Apache Kafka 中按主题名称创建生产消息。
输入参数:
- TopicName:要生产消息的 Kafka 主题名称。
- Records:要生产到主题的消息记录列表。每条记录必须是 JSON 对象,包含必填的 value(消息有效载荷)和可选的 key(用于路由的分区键)。示例:[{"key": "user-123", "value": {"name": "John", "action": "login"}}]
可选参数:
- PartitionId:目标分区 ID(从 0 开始的索引)。默认值为 0。要查找主题的分区数,请使用“获取主题元数据”操作。如果提供了键 (key) 字段,消息将根据该字段分布在各个分区中。
输出:
- ProduceResult (object):Kafka 生产结果,包含 offsets(写入消息的分区偏移量列表,含 partition、offset、error_code)、key_schema_id 和 value_schema_id。示例:{"offsets": [{"partition": 0, "offset": 42, "error_code": null}]}
- OriginalStatusCode (number):上游 API 返回的原始 HTTP 状态码。用于调试。
- StatusCode (number):操作状态码:200=成功(检查 ErrorMessage 以获取业务错误),-1=参数验证错误,500=系统错误(可能会重试)。
- ErrorMessage (string):如果发生任何错误,则提供详细的错误消息。如果操作成功,则为空字符串。
Get Many Partitions
从 Apache Kafka 中按主题名称获取多个分区。
输入参数:
- TopicName:Kafka 集群中的主题名称。请使用确切的主题名称(区分大小写)。您可以从“获取多个主题”操作中获取可用的主题名称。示例:user-events
输出:
- Partitions (object-array):来自 Kafka 的分区元数据对象数组。每个分区包含:partition(分区 ID)、leader(分区领导者的代理 ID)、replicas(托管副本的代理 ID 列表)、isr(同步副本代理 ID 列表)。示例:[{"partition": 0, "leader": 1, "replicas": [1,2,3], "isr": [1,2]}]
- OriginalStatusCode (number):Kafka REST Proxy API 返回的原始 HTTP 状态码。默认值 0 表示请求未到达上游(例如超时)。用于调试。
- StatusCode (number):操作状态码:200=成功(检查 ErrorMessage 以获取业务错误),-1=参数验证错误,500=系统错误(可能会重试)。
- ErrorMessage (string):如果发生任何错误,则提供详细的错误消息。如果操作成功,则为空字符串。
Get a Partition
从 Apache Kafka 中按主题名称获取有关主题中单个分区的元数据。
输入参数:
- TopicName:Kafka 主题的名称(区分大小写)。可以通过“获取多个主题”操作获取。示例:'orders-topic'
可选参数:
- PartitionId:要检查的分区 ID(从 0 开始)。可以通过“获取多个分区”操作获取。如果未指定,则默认为分区 0。
输出:
- Partition (object):分区元数据对象,包含 partition(分区 ID)、leader(领导者副本的代理 ID)、replicas(托管副本的代理 ID 列表)和 isr(同步副本集中的代理 ID 列表)。示例:{"partition": 0, "leader": 1, "replicas": [1, 2, 3], "isr": [1, 2]}
- OriginalStatusCode (number):上游 API 返回的原始 HTTP 状态码。用于调试。
- StatusCode (number):操作状态码:200=成功(检查 ErrorMessage 以获取业务错误),-1=参数验证错误,500=系统错误(可能会重试)。
- ErrorMessage (string):如果发生任何错误,则提供详细的错误消息。如果操作成功,则为空字符串。
Get Many Topics
从 Kafka 集群获取所有主题。
输出:
- Topics (object-array):从 Kafka 检索到的主题对象数组。每个主题对象包含 name(主题名称)。注意:此接口仅返回主题名称列表,如需详细信息请调用“获取主题”接口。
- OriginalStatusCode (number):上游 API 返回的原始 HTTP 状态码。默认值 0 表示请求未到达上游(例如超时)。用于调试。
- StatusCode (number):操作状态码:200=成功(检查 ErrorMessage 以获取业务错误),-1=参数验证错误,500=系统错误(可能会重试)。
- ErrorMessage (string):如果发生任何错误,则提供详细的错误消息。如果操作成功,则为空字符串。
Get a Topic
从 Apache Kafka 中按主题名称获取主题。
输入参数:
- TopicName:要检索的主题名称。
输出:
- Topic (object):从 Kafka REST Proxy 检索到的主题数据。包含:name(主题名称)、configs(主题配置设置)、partition_count(分区数)、replication_factor(复制因子)。示例:{"name": "my-topic", "configs": {"retention.ms": "604800000"}, "partition_count": 3, "replication_factor": 2}
- OriginalStatusCode (number):上游 API 返回的原始 HTTP 状态码。用于调试。
- StatusCode (number):操作状态码:200=成功(检查 ErrorMessage 以获取业务错误),-1=参数验证错误,500=系统错误(可能会重试)。
- ErrorMessage (string):如果发生任何错误,则提供详细的错误消息。如果操作成功,则为空字符串。
5. Example Usage
This section will guide you through creating a simple workflow to send a message to a Kafka topic using the Create a Produce Message operation.
Workflow Overview
The workflow will consist of three nodes: Start -> Create a Produce Message -> Answer.
Step-by-Step Guide
- Add the Tool Node:
- In the workflow canvas, click the "+" button to add a new node.
- In the panel that appears, select the "Tools" tab.
- Find and select "Kafka" from the list of tools.
- From the list of supported operations for Kafka, click on Create a Produce Message to add the node to your canvas.
- Configure the Node:
- Click on the newly added Create a Produce Message node to open its configuration panel on the right.
- Configure Credentials: In the credentials field at the top of the panel, click the dropdown menu and select your pre-configured Kafka credentials.
- Fill in Parameters: Complete the input fields as follows:
- TopicName: Enter the name of the topic you want to send a message to, for example, user-signups.
- Records: This field expects an array of objects, where each object represents a message. You can use a JSON editor or reference output from a previous node. For a simple test, you can enter a static value like:
[ { "key": "user123", "value": "{\"name\": \"Alice\", \"event\": \"signup\"}" } ] - Run and Validate:
- Once all required parameters are correctly filled, any error indicators on the workflow canvas will disappear.
- Click the "Run" button in the top-right corner of the canvas to execute the workflow.
- After a successful execution, you can click the log icon in the top-right corner to view the detailed inputs and outputs of the node, confirming that the message was sent successfully.
After completing these steps, your workflow is fully configured. When executed, it will send the specified record to your Kafka topic.
6. FAQs
Q: I'm getting a connection error. What should I check?
A: Connection errors are often related to incorrect configuration or network issues. Please verify the following:
- Broker Addresses: Ensure the broker addresses in your credentials are correct and reachable from the GoInsight environment.
- Authentication: Double-check that your authentication mechanism (e.g., SASL/SCRAM) and credentials are correct.
- Network/Firewall: Confirm that there are no firewalls or network security groups blocking the connection between GoInsight and your Kafka brokers.
Q: How should I format the Records parameter for the "Create a Produce Message" operation?
A: The Records parameter must be an array of JSON objects. Each object in the array represents a single message to be sent to Kafka. A message object should contain a value and can optionally include a key. The value should typically be a stringified JSON object.
- Example:
[ { "key": "order-456", "value": "{\"productId\": \"prod-abc\", \"quantity\": 2, \"price\": 50.00}" }, { "key": "order-457", "value": "{\"productId\": \"prod-xyz\", \"quantity\": 1, \"price\": 120.50}" } ]
7. Official Documentation
For more in-depth information about Apache Kafka and its concepts, please refer to the official documentation.
Leave a Reply.