분산모드 커넥트로 실행

bin/connect-distributed config/connect-distributed.properties

분산모드의 경우 사용하는 커넥터의 설정파일을 같이 세팅하지않고, 별도로 스타트를 함

Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=my-kafka:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# These are provided to inform the user about the presence of the REST host and port configs
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples:
plugin.path=C:/Ayeong/kafka_2.12-2.5.0/plugins/camel-postgresql-sink-kafka-connector

CamelPostgrdsqlSinkConnector을 jar 파일들의 위치를 plugin.path에 추가해주고, group_id를 맞추어 지정해준다.

curl -X GET [<http://localhost:8083/connector-plugins>](<http://localhost:8083/connector-plugins>)

이 명령어를 통해서 커넥터 플러그인 리스트를 받아왔을때,제대로 CamelSinkConnector가 추가되어 있음.


[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"3.18.2"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"3.18.2"},{"class":"org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector","type":"sink","version":"3.18.2"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

PostgreSql을 연결하기위한 정보전송

curl -X POST '<http://localhost:8083/connectors>'  --header 'Content-Type:application/json'  --data '{
     "name": "CamelPostgresql-sinkSinkConnector",
     "config": {
        "connector.class": "org.apache.camel.kafkaconnector.postgresqlsink.CamelPostgresqlsinkSinkConnector",
        "camel.kamelet.postgresql-source.databaseName":"postgres",
        "camel.kamelet.postgresql-source.password":"d1230102",
        "camel.kamelet.postgresql-source.serverName":"localhost",
        "camel.kamelet.postgresql-source.username":"postgres",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "topics":"deviceIfo"
    }
}'

커넥터를 실행하기위해 postgre와 연결하기위한 커넥터 작업을 진행하였는데, 이 부분에서 아래와같은 에러가 발생하였다. 동일한 에러가 발생한 사람의 stackOverflow를 보았지만, org/apache/camel/Exchange 가 없으니, jctools.jar 파일을 설치해보라는 얘기가 있어, 별도로 다운받아 기존의 jar 파일과 교체하였지만 그럼에도 불구하고 해결이 되지 않았다.

WARN unhandled due to prior sendError (org.eclipse.jetty.server.HttpChannelState:767)
javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: org/apache/camel/Exchange
        at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:408)
        at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:365)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:318)
        at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:760)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:547)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
        at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
        at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
        at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
        at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
        at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:500)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoClassDefFoundError: org/apache/camel/Exchange
        at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:254)
        at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:236)
        at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:436)
        at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:261)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
        at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
        at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
        at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
        at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
        ... 30 more
Caused by: java.lang.NoClassDefFoundError: org/apache/camel/Exchange
        at org.apache.camel.kafkaconnector.CamelSinkConnectorConfig.<clinit>(CamelSinkConnectorConfig.java:43)
        at org.apache.camel.kafkaconnector.CamelSinkConnector.config(CamelSinkConnector.java:67)
        at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
        at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:334)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$5.call(DistributedHerder.java:752)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder$5.call(DistributedHerder.java:749)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:349)
        at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:289)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.camel.Exchange
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 13 more

Connector 사용 중단 이유