분산모드 커넥트로 실행
bin/connect-distributed config/connect-distributed.properties
분산모드의 경우 사용하는 커넥터의 설정파일을 같이 세팅하지않고, 별도로 스타트를 함
Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.
# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=my-kafka:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true
# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083
# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=
# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
plugin.path=C:/Ayeong/kafka_2.12-2.5.0/plugins/camel-postgresql-sink-kafka-connector
CamelPostgrdsqlSinkConnector을 jar 파일들의 위치를 plugin.path에 추가해주고, group_id를 맞추어 지정해준다.
curl -X GET [<http://localhost:8083/connector-plugins>](<http://localhost:8083/connector-plugins>)
이 명령어를 통해서 커넥터 플러그인 리스트를 받아왔을때,제대로 CamelSinkConnector가 추가되어 있음.
[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"3.18.2"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"3.18.2"},{"class":"org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector","type":"sink","version":"3.18.2"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
curl -X POST '<http://localhost:8083/connectors>' --header 'Content-Type:application/json' --data '{
"name": "CamelPostgresql-sinkSinkConnector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector",
"camel.kamelet.postgresql-source.databaseName":"postgres",
"camel.kamelet.postgresql-source.password":"d1230102",
"camel.kamelet.postgresql-source.serverName":"localhost",
"camel.kamelet.postgresql-source.username":"postgres",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"topics":"deviceIfo"
}
}'
커넥터를 실행하기위해 postgre와 연결하기위한 커넥터 작업을 진행하였는데, 이 부분에서 아래와같은 에러가 발생하였다. 동일한 에러가 발생한 사람의 stackOverflow를 보았지만, org/apache/camel/Exchange 가 없으니, jctools.jar 파일을 설치해보라는 얘기가 있어, 별도로 다운받아 기존의 jar 파일과 교체하였지만 그럼에도 불구하고 해결이 되지 않았다.
WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:767)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: org/apache/camel/Exchange
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:408)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:365)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:318)
at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:760)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:547)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
at org.eclipse.jetty.server.Server.handle(Server.java:500)
at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: org/apache/camel/Exchange
at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
... 30 more
Caused by: java.lang.NoClassDefFoundError: org/apache/camel/Exchange
at org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.<clinit>(CamelSinkConnectorConfig.java:43)
at org.apache.camel.kafkaconnector.CamelSinkConnector.config(CamelSinkConnector.java:67)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:334)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$5.call(DistributedHerder.java:752)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$5.call(DistributedHerder.java:749)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:349)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:289)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.camel.Exchange
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 13 more