snctl, pulsarctl, pulsar-admin, and Terraform.
If you want to update or delete connectors using
snctl, pulsarctl or pulsar-admin, make sure you have set up your client tool. For more information, see set up client tools.Update a connector
When you want to modify configurations or update resources for connectors, you can update connectors using multiple tools. The following example shows how to update the parallelism of the data generator source connectortest to 2 using different tools.
- snctl
- pulsarctl
- pulsar-admin
- Terraform
- Console
- Rest API
snctl pulsar admin sources update \
--name test \
--parallelism 2
Updated successfully
snctl pulsar admin sources status --name test
{
"numInstances": 2,
"numRunning": 2,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 1799,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 1799,
"lastReceivedTime": 1693946327331,
"workerId": "test"
}
},
{
"instanceId": 1,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 689,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 689,
"lastReceivedTime": 1693946327129,
"workerId": "test"
}
}
]
}
pulsarctl sources update \
--name test \
--parallelism 2
Updated successfully
pulsarctl sources status --name test
{
"numInstances": 2,
"numRunning": 2,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 1799,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 1799,
"lastReceivedTime": 1693946327331,
"workerId": "test"
}
},
{
"instanceId": 1,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 689,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 689,
"lastReceivedTime": 1693946327129,
"workerId": "test"
}
}
]
}
pulsar-admin \
--admin-url "${WEB_SERVICE_URL}" \
--auth-plugin org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2 \
--auth-params '{"privateKey":"file:///YOUR-KEY-FILE-PATH",
"issuerUrl":"https://auth.streamnative.cloud/",
"audience":"urn:sn:pulsar:${orgName}:${instanceName}}'
sources update \
--name test \
--parallelism 2
admin-url: the HTTP service URL of your Pulsar cluster.private_key: the path to the downloaded OAuth2 key file.audience: the Uniform Resource Name (URN), which is a combination of theurn:sn:pulsar, your organization name, and your Pulsar instance name.${orgName}: the name of your organization.${instanceName}: the name of your instance.
Updated successfully
pulsar-admin sources status --name test
{
"numInstances" : 2,
"numRunning" : 2,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 1287,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 1287,
"lastReceivedTime" : 1693946605095,
"workerId" : "test"
}
}, {
"instanceId" : 1,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceivedFromSource" : 909,
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"numSourceExceptions" : 0,
"latestSourceExceptions" : [ ],
"numWritten" : 909,
"lastReceivedTime" : 1693946605023,
"workerId" : "test"
}
} ]
}
To update the submitted connector, you only need to update the Terraform file and then call the following command.
# update the main.tf file
terraform apply
# output
pulsar_source.test: Refreshing state... [id=public/default/dg-test-tf]
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
~ update in-place
Terraform will perform the following actions:
# pulsar_source.test will be updated in-place
~ resource "pulsar_source" "test" {
~ archive = "connectors/pulsar-io-data-generator-2.9.2.17.nar" -> "builtin://data-generator"
- custom_runtime_options = jsonencode(
{
- clusterName = "test"
- managed = true
- maxReplicas = 0
- outputTypeClassName = "org.apache.pulsar.io.datagenerator.Person"
- serviceAccountName = "test-function-pulsarcluster"
}
) -> null
id = "public/default/dg-test-tf"
name = "dg-test-tf"
~ parallelism = 1 -> 2
# (10 unchanged attributes hidden)
}
Plan: 0 to add, 1 to change, 0 to destroy.
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value: yes
pulsar_source.test: Modifying... [id=public/default/dg-test-tf]
pulsar_source.test: Modifications complete after 1s [id=public/default/dg-test-tf]
Apply complete! Resources: 0 added, 1 changed, 0 destroyed.
- On the left navigation pane of StreamNative Console, under Resources, click Connectors.
- On the Connectors page, select the Created Sources tab.
- Click the ellipsis at the end of the row of the connector, and then click Edit.
- Edit the configuration that you want to change, and click SUBMIT.
curl -X PUT https://${WEB_SERVICE_URL}/admin/v3/sources/{tenant}/{namespace}/test \
-H 'Authorization: Bearer <API Key>' \
-H "Content-Type: multipart/form-data" \
-F 'sourceConfig={"name": "test", "tenant": "public", "namespace": "default", "parallelism": 2};type=application/json'
curl -X GET https://${WEB_SERVICE_URL}/admin/v3/sources/public/default/test/status \
--header 'Authorization: Bearer <API Key>' | jq '.'
{
"numInstances": 2,
"numRunning": 2,
"instances": [
{
"instanceId": 0,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 1799,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 1799,
"lastReceivedTime": 1693946327331,
"workerId": "test"
}
},
{
"instanceId": 1,
"status": {
"running": true,
"error": "",
"numRestarts": 0,
"numReceivedFromSource": 689,
"numSystemExceptions": 0,
"latestSystemExceptions": [],
"numSourceExceptions": 0,
"latestSourceExceptions": [],
"numWritten": 689,
"lastReceivedTime": 1693946327129,
"workerId": "test"
}
}
]
}
Delete a connector
The following example shows how to delete the data generator source connectortest using different tools.
- snctl
- pulsarctl
- pulsar-admin
- Terraform
- Console
- Rest API
To delete the source connector You should see the following output:If you want to verify whether the source connector has been deleted successfully, run the following command.You should see the following output:
test, use the following command.snctl pulsar admin sources delete --tenant public --namespace default --name test
Deleted test successfully
snctl pulsar admin sources get --tenant public --namespace default --name test
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector You should see the following output:If you want to verify whether the source connector has been deleted successfully, run the following command.You should see the following output:
test, use the following command.pulsarctl sources delete --tenant public --namespace default --name test
Deleted test successfully
pulsarctl sources get --tenant public --namespace default --name test
[✖] code: 500 reason: failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-XXXXX" not found
To delete the source connector You should see the following output:To verify the source connector has been deleted, run the following command.You should see the following output:
test, run the following command../bin/pulsar-admin sources delete --tenant public --namespace default --name test
Delete source successfully
./bin/pulsar-admin sources get --tenant public --namespace default --name test
failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io "test-e9ef0ca6" not found
To delete the source connector
test with terraform, run the following command and type yes on the prompt.terraform destroy
# output
pulsar_source.test: Refreshing state... [id=public/default/dg-test-tf]
Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
- destroy
Terraform will perform the following actions:
# pulsar_source.test will be destroyed
- resource "pulsar_source" "test" {
- archive = "connectors/pulsar-io-data-generator-2.9.2.17.nar" -> null
- classname = "org.apache.pulsar.io.datagenerator.DataGeneratorSource" -> null
- configs = jsonencode(
{
- sleepBetweenMessages = "60"
}
) -> null
- cpu = 1 -> null
- custom_runtime_options = jsonencode(
{
- clusterName = "test"
- managed = true
- maxReplicas = 0
- outputTypeClassName = "org.apache.pulsar.io.datagenerator.Person"
- serviceAccountName = "test-function-pulsarcluster"
}
) -> null
- destination_topic_name = "public/default/dg-test" -> null
- disk_mb = 10240 -> null
- id = "public/default/dg-test-tf" -> null
- name = "dg-test-tf" -> null
- namespace = "default" -> null
- parallelism = 2 -> null
- processing_guarantees = "ATMOST_ONCE" -> null
- ram_mb = 1024 -> null
- tenant = "public" -> null
- use_thread_local_producers = false -> null
}
Plan: 0 to add, 0 to change, 1 to destroy.
Do you really want to destroy all resources?
Terraform will destroy all your managed infrastructure, as shown above.
There is no undo. Only 'yes' will be accepted to confirm.
Enter a value: yes
pulsar_source.test: Destroying... [id=public/default/dg-test-tf]
pulsar_source.test: Destruction complete after 1s
Destroy complete! Resources: 1 destroyed.
- On the left navigation pane of StreamNative Console, under Resources, click Connectors.
- On the Connectors page, select the Created Sources tab.
- Click the ellipsis at the end of the row of the connector, and then click Delete.
- Enter the connector name and then click Confirm.
To delete the source connector If no error is response, the delete is successful.If you want to verify whether the source connector has been deleted successfully, run the following command.You should see the following output:
test, use the following command.curl -X DELETE https://${WEB_SERVICE_URL}/admin/v3/sources/public/default/test \
--header 'Authorization: Bearer <API Key>'
curl -X GET https://${WEB_SERVICE_URL}/admin/v3/sources/public/default/test/status \
--header 'Authorization: Bearer <API Key>' | jq '.'
{"reason":"failed to perform the request: responseCode: 404, responseMessage: sources.compute.functionmesh.io \"test-6b51d8ef\" not found"}%