Create
Creates a new Kafka topic in the specified cluster.
- TypeScript
- Python
import {
cloudApi,
decodeMessage,
serviceClients,
Session,
waitForOperation,
} from "@yandex-cloud/nodejs-sdk";
const CompressionType = cloudApi.mdb.kafka_common.CompressionType;
const CreateTopicRequest = cloudApi.mdb.kafka_topic_service.CreateTopicRequest;
const Topic = cloudApi.mdb.kafka_topic.Topic;
const TopicConfig2_8_CleanupPolicy =
cloudApi.mdb.kafka_topic.TopicConfig2_8_CleanupPolicy;
const TopicConfig3_CleanupPolicy =
cloudApi.mdb.kafka_topic.TopicConfig3_CleanupPolicy;
(async () => {
const authToken = process.env["YC_OAUTH_TOKEN"];
const session = new Session({ oauthToken: authToken });
const client = session.client(serviceClients.TopicServiceClient);
const operation = await client.create(
CreateTopicRequest.fromPartial({
clusterId: "clusterId",
topicSpec: {
// name: "name",
// partitions: {
// value: 0
// },
// replicationFactor: {
// value: 0
// },
// topicConfig_2_8: {
// cleanupPolicy: TopicConfig2_8_CleanupPolicy.CLEANUP_POLICY_DELETE,
// compressionType: CompressionType.COMPRESSION_TYPE_UNCOMPRESSED,
// deleteRetentionMs: {
// value: 0
// },
// fileDeleteDelayMs: {
// value: 0
// },
// flushMessages: {
// value: 0
// },
// flushMs: {
// value: 0
// },
// minCompactionLagMs: {
// value: 0
// },
// retentionBytes: {
// value: 0
// },
// retentionMs: {
// value: 0
// },
// maxMessageBytes: {
// value: 0
// },
// minInsyncReplicas: {
// value: 0
// },
// segmentBytes: {
// value: 0
// },
// preallocate: {
// value: true
// }
// },
// topicConfig_3: {
// cleanupPolicy: TopicConfig3_CleanupPolicy.CLEANUP_POLICY_DELETE,
// compressionType: CompressionType.COMPRESSION_TYPE_UNCOMPRESSED,
// deleteRetentionMs: {
// value: 0
// },
// fileDeleteDelayMs: {
// value: 0
// },
// flushMessages: {
// value: 0
// },
// flushMs: {
// value: 0
// },
// minCompactionLagMs: {
// value: 0
// },
// retentionBytes: {
// value: 0
// },
// retentionMs: {
// value: 0
// },
// maxMessageBytes: {
// value: 0
// },
// minInsyncReplicas: {
// value: 0
// },
// segmentBytes: {
// value: 0
// },
// preallocate: {
// value: true
// }
// }
},
})
);
const finishedOp = await waitForOperation(operation, session);
if (finishedOp.response) {
const result = decodeMessage<typeof Topic>(finishedOp.response);
console.log(result);
}
})();
import os
import grpc
import yandexcloud
from yandex.cloud.mdb.kafka.v1.common_pb2 import CompressionType
from yandex.cloud.mdb.kafka.v1.topic_service_pb2 import CreateTopicMetadata
from yandex.cloud.mdb.kafka.v1.topic_service_pb2 import CreateTopicRequest
from yandex.cloud.mdb.kafka.v1.topic_pb2 import Topic
from yandex.cloud.mdb.kafka.v1.topic_pb2 import TopicConfig2_8
from yandex.cloud.mdb.kafka.v1.topic_pb2 import TopicConfig3
from yandex.cloud.mdb.kafka.v1.topic_service_pb2_grpc import TopicServiceStub
from yandex.cloud.mdb.kafka.v1.topic_pb2 import TopicSpec
token = os.getenv("YC_OAUTH_TOKEN")
sdk = yandexcloud.SDK(token=token)
service = sdk.client(TopicServiceStub)
operation = service.Create(
CreateTopicRequest(
cluster_id="clusterId",
topic_spec=TopicSpec(
# name = "name",
# partitions = Int64Value(
# value = 0
# ),
# replication_factor = Int64Value(
# value = 0
# ),
# topic_config_2_8 = TopicConfig2_8(
# cleanup_policy = TopicConfig2_8.CleanupPolicy.CLEANUP_POLICY_DELETE,
# compression_type = CompressionType.COMPRESSION_TYPE_UNCOMPRESSED,
# delete_retention_ms = Int64Value(
# value = 0
# ),
# file_delete_delay_ms = Int64Value(
# value = 0
# ),
# flush_messages = Int64Value(
# value = 0
# ),
# flush_ms = Int64Value(
# value = 0
# ),
# min_compaction_lag_ms = Int64Value(
# value = 0
# ),
# retention_bytes = Int64Value(
# value = 0
# ),
# retention_ms = Int64Value(
# value = 0
# ),
# max_message_bytes = Int64Value(
# value = 0
# ),
# min_insync_replicas = Int64Value(
# value = 0
# ),
# segment_bytes = Int64Value(
# value = 0
# ),
# preallocate = BoolValue(
# value = true
# )
# ),
# topic_config_3 = TopicConfig3(
# cleanup_policy = TopicConfig3.CleanupPolicy.CLEANUP_POLICY_DELETE,
# compression_type = CompressionType.COMPRESSION_TYPE_UNCOMPRESSED,
# delete_retention_ms = Int64Value(
# value = 0
# ),
# file_delete_delay_ms = Int64Value(
# value = 0
# ),
# flush_messages = Int64Value(
# value = 0
# ),
# flush_ms = Int64Value(
# value = 0
# ),
# min_compaction_lag_ms = Int64Value(
# value = 0
# ),
# retention_bytes = Int64Value(
# value = 0
# ),
# retention_ms = Int64Value(
# value = 0
# ),
# max_message_bytes = Int64Value(
# value = 0
# ),
# min_insync_replicas = Int64Value(
# value = 0
# ),
# segment_bytes = Int64Value(
# value = 0
# ),
# preallocate = BoolValue(
# value = true
# )
# )
),
)
)
operation_result = sdk.wait_operation_and_get_result(
operation,
response_type=Topic,
meta_type=CreateTopicMetadata,
)
print(operation_result)
CreateTopicRequest
clusterId
: string
ID of the Apache Kafka® cluster to create a topic in.
To get the cluster ID, make a ClusterService.List request.
topicSpec
: TopicSpec
Configuration of the topic to create.
TopicSpec
name
: string
Name of the topic.
partitions
: google.protobuf.Int64Value
The number of the topic's partitions.
replicationFactor
: google.protobuf.Int64Value
Amount of copies of a topic data kept in the cluster.
One of topicConfig
User-defined settings for the topic.
topicConfig_2_8
: TopicConfig2_8
topicConfig_3
: TopicConfig3
TopicConfig2_8
A topic settings for 2.8
CleanupPolicy
CLEANUP_POLICY_UNSPECIFIED
CLEANUP_POLICY_DELETE
This policy discards log segments when either their retention time or log size limit is reached. See also: [KafkaConfig2_8.log_retention_ms][2] and other similar parameters.
CLEANUP_POLICY_COMPACT
This policy compacts messages in log.
CLEANUP_POLICY_COMPACT_AND_DELETE
This policy use both compaction and deletion for messages and log segments.
cleanupPolicy
: CleanupPolicy
Retention policy to use on old log messages.
compressionType
: CompressionType
The compression type for a given topic.
deleteRetentionMs
: google.protobuf.Int64Value
The amount of time in milliseconds to retain delete tombstone markers for log compacted topics.
fileDeleteDelayMs
: google.protobuf.Int64Value
The time to wait before deleting a file from the filesystem.
flushMessages
: google.protobuf.Int64Value
The number of messages accumulated on a log partition before messages are flushed to disk.
This setting overrides the cluster-level KafkaConfig2_8.log_flush_interval_messages setting on the topic level.
flushMs
: google.protobuf.Int64Value
The maximum time in milliseconds that a message in the topic is kept in memory before flushed to disk.
This setting overrides the cluster-level KafkaConfig2_8.log_flush_interval_ms setting on the topic level.
minCompactionLagMs
: google.protobuf.Int64Value
The minimum time in milliseconds a message will remain uncompacted in the log.
retentionBytes
: google.protobuf.Int64Value
The maximum size a partition can grow to before Kafka will discard old log segments to free up space if the delete
cleanup_policy is in effect.
It is helpful if you need to control the size of log due to limited disk space.
This setting overrides the cluster-level KafkaConfig2_8.log_retention_bytes setting on the topic level.
retentionMs
: google.protobuf.Int64Value
The number of milliseconds to keep a log segment's file before deleting it.
This setting overrides the cluster-level KafkaConfig2_8.log_retention_ms setting on the topic level.
maxMessageBytes
: google.protobuf.Int64Value
The largest record batch size allowed in topic.
minInsyncReplicas
: google.protobuf.Int64Value
This configuration specifies the minimum number of replicas that must acknowledge a write to topic for the write to be considered successful (when a producer sets acks to "all").
segmentBytes
: google.protobuf.Int64Value
This configuration controls the segment file size for the log. Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention.
This setting overrides the cluster-level KafkaConfig2_8.log_segment_bytes setting on the topic level.
preallocate
: google.protobuf.BoolValue
True if we should preallocate the file on disk when creating a new log segment.
This setting overrides the cluster-level KafkaConfig2_8.log_preallocate setting on the topic level.
TopicConfig3
A topic settings for 3.x
CleanupPolicy
CLEANUP_POLICY_UNSPECIFIED
CLEANUP_POLICY_DELETE
This policy discards log segments when either their retention time or log size limit is reached. See also: [KafkaConfig3.log_retention_ms][10] and other similar parameters.
CLEANUP_POLICY_COMPACT
This policy compacts messages in log.
CLEANUP_POLICY_COMPACT_AND_DELETE
This policy use both compaction and deletion for messages and log segments.
cleanupPolicy
: CleanupPolicy
Retention policy to use on old log messages.
compressionType
: CompressionType
The compression type for a given topic.
deleteRetentionMs
: google.protobuf.Int64Value
The amount of time in milliseconds to retain delete tombstone markers for log compacted topics.
fileDeleteDelayMs
: google.protobuf.Int64Value
The time to wait before deleting a file from the filesystem.
flushMessages
: google.protobuf.Int64Value
The number of messages accumulated on a log partition before messages are flushed to disk.
This setting overrides the cluster-level KafkaConfig3.log_flush_interval_messages setting on the topic level.
flushMs
: google.protobuf.Int64Value
The maximum time in milliseconds that a message in the topic is kept in memory before flushed to disk.
This setting overrides the cluster-level KafkaConfig3.log_flush_interval_ms setting on the topic level.
minCompactionLagMs
: google.protobuf.Int64Value
The minimum time in milliseconds a message will remain uncompacted in the log.
retentionBytes
: google.protobuf.Int64Value
The maximum size a partition can grow to before Kafka will discard old log segments to free up space if the delete
cleanup_policy is in effect.
It is helpful if you need to control the size of log due to limited disk space.
This setting overrides the cluster-level KafkaConfig3.log_retention_bytes setting on the topic level.
retentionMs
: google.protobuf.Int64Value
The number of milliseconds to keep a log segment's file before deleting it.
This setting overrides the cluster-level KafkaConfig3.log_retention_ms setting on the topic level.
maxMessageBytes
: google.protobuf.Int64Value
The largest record batch size allowed in topic.
minInsyncReplicas
: google.protobuf.Int64Value
This configuration specifies the minimum number of replicas that must acknowledge a write to topic for the write to be considered successful (when a producer sets acks to "all").
segmentBytes
: google.protobuf.Int64Value
This configuration controls the segment file size for the log. Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention.
This setting overrides the cluster-level KafkaConfig3.log_segment_bytes setting on the topic level.
preallocate
: google.protobuf.BoolValue
True if we should preallocate the file on disk when creating a new log segment.
This setting overrides the cluster-level KafkaConfig3.log_preallocate setting on the topic level.
Operation
An Operation resource. For more information, see Operation.
id
: string
ID of the operation.
description
: string
Description of the operation. 0-256 characters long.
createdAt
: google.protobuf.Timestamp
Creation timestamp.
createdBy
: string
ID of the user or service account who initiated the operation.
modifiedAt
: google.protobuf.Timestamp
The time when the Operation resource was last modified.
done
: bool
If the value is false
, it means the operation is still in progress.
If true
, the operation is completed, and either error
or response
is available.
metadata
: google.protobuf.Any
Service-specific metadata associated with the operation. It typically contains the ID of the target resource that the operation is performed on. Any method that returns a long-running operation should document the metadata type, if any.
One of result
The operation result.
If done == false
and there was no failure detected, neither error
nor response
is set.
If done == false
and there was a failure detected, error
is set.
If done == true
, exactly one of error
or response
is set.
error
: google.rpc.StatusThe error result of the operation in case of failure or cancellation.
response
: google.protobuf.AnyThe normal response of the operation in case of success.
If the original method returns no data on success, such as Delete, the response is google.protobuf.Empty. If the original method is the standard Create/Update, the response should be the target resource of the operation. Any method that returns a long-running operation should document the response type, if any.