diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java new file mode 100644 index 000000000000..5f20301d458d --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListener.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +/** + * Interface for event listener plugin implementations. + */ +public interface OMEventListener { + + void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext); + + void start(); + + void shutdown(); +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java new file mode 100644 index 000000000000..06ddd45f1f2e --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContext.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +/** + * A narrow set of functionality we are ok with exposing to plugin + * implementations. + */ +public interface OMEventListenerPluginContext { + + boolean isLeaderReady(); + + // TODO: should we allow plugins to pass in maxResults or just limit + // them to some predefined value for safety? e.g. 10K + List listCompletedRequestInfo(String startKey, int maxResults) throws IOException; + + // XXX: this probably doesn't belong here + String getThreadNamePrefix(); +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java new file mode 100644 index 000000000000..dbda5e337cce --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes for the OM Event Listener implementation. + */ +package org.apache.hadoop.ozone.om.eventlistener; diff --git a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt index fda1e61820a9..2e12ec5d1c7e 100644 --- a/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt +++ b/hadoop-ozone/dist/src/main/license/bin/LICENSE.txt @@ -262,6 +262,7 @@ CDDL 1.1 + GPLv2 with classpath exception Apache License 2.0 ===================== + at.yawk.lz4:lz4-java ch.qos.reload4j:reload4j com.amazonaws:aws-java-sdk-core com.amazonaws:aws-java-sdk-kms @@ -384,6 +385,7 @@ Apache License 2.0 org.apache.hadoop:hadoop-shaded-guava org.apache.hadoop:hadoop-shaded-protobuf_3_25 org.apache.httpcomponents:httpcore + org.apache.kafka:kafka-clients org.apache.kerby:kerb-admin org.apache.kerby:kerb-client org.apache.kerby:kerb-common diff --git a/hadoop-ozone/dist/src/main/license/jar-report.txt b/hadoop-ozone/dist/src/main/license/jar-report.txt index 94827486c8ef..e40eea85e85a 100644 --- a/hadoop-ozone/dist/src/main/license/jar-report.txt +++ b/hadoop-ozone/dist/src/main/license/jar-report.txt @@ -159,6 +159,7 @@ share/ozone/lib/json-simple.jar share/ozone/lib/jsp-api.jar share/ozone/lib/jspecify.jar share/ozone/lib/jsr311-api.jar +share/ozone/lib/kafka-clients.jar share/ozone/lib/kerb-core.jar share/ozone/lib/kerb-crypto.jar share/ozone/lib/kerb-util.jar @@ -170,6 +171,7 @@ share/ozone/lib/kotlin-stdlib.jar share/ozone/lib/listenablefuture-empty-to-avoid-conflict-with-guava.jar share/ozone/lib/log4j-api.jar share/ozone/lib/log4j-core.jar +share/ozone/lib/lz4-java.jar share/ozone/lib/metrics-core.jar share/ozone/lib/netty-buffer.Final.jar share/ozone/lib/netty-codec.Final.jar @@ -227,6 +229,7 @@ share/ozone/lib/ozone-insight.jar share/ozone/lib/ozone-interface-client.jar share/ozone/lib/ozone-interface-storage.jar share/ozone/lib/ozone-manager.jar +share/ozone/lib/ozone-manager-plugins.jar share/ozone/lib/ozone-multitenancy-ranger.jar share/ozone/lib/ozone-reconcodegen.jar share/ozone/lib/ozone-recon.jar diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java index abd63f09c0e5..da07cc2b2c03 100644 --- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java +++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java @@ -345,6 +345,17 @@ ListSnapshotResponse listSnapshot( List listVolumes(String userName, String prefix, String startKey, int maxKeys) throws IOException; + /** + * Returns a list of operation info objects. + * + * @param startKey the start key determines where to start listing + * from, this key is excluded from the result. + * @param maxResults the maximum number of results to return. + * @return a list of {@link OmCompletedRequestInfo} + * @throws IOException + */ + List listCompletedRequestInfo(String startKey, int maxResults) throws IOException; + /** * Returns the names of up to {@code count} open keys whose age is * greater than or equal to {@code expireThreshold}. diff --git a/hadoop-ozone/ozone-manager-plugins/pom.xml b/hadoop-ozone/ozone-manager-plugins/pom.xml new file mode 100644 index 000000000000..2c804fded5da --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/pom.xml @@ -0,0 +1,77 @@ + + + + 4.0.0 + + org.apache.ozone + ozone + 2.2.0-SNAPSHOT + + ozone-manager-plugins + 2.2.0-SNAPSHOT + jar + Apache Ozone Manager Plugins + + false + UTF-8 + + + + + com.google.guava + guava + + + org.apache.hadoop + hadoop-common + + + org.apache.kafka + kafka-clients + + + org.apache.ozone + hdds-common + + + org.apache.ozone + hdds-server-framework + + + org.apache.ozone + ozone-common + + + org.apache.ozone + ozone-interface-client + + + org.slf4j + slf4j-api + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + none + + + + + diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java new file mode 100644 index 000000000000..8dc31dc925e0 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerKafkaPublisher.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is an implementation of OMEventListener which uses the + * OMEventListenerLedgerPoller as a building block to periodically poll/consume + * completed operations, serialize them to a S3 schema and produce them + * to a kafka topic. + */ +public class OMEventListenerKafkaPublisher implements OMEventListener { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerKafkaPublisher.class); + + private static final String KAFKA_CONFIG_PREFIX = "ozone.notify.kafka."; + private static final int COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE = 1; + + private OMEventListenerLedgerPoller ledgerPoller; + private KafkaClientWrapper kafkaClient; + private OMEventListenerLedgerPollerSeekPosition seekPosition; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + Map kafkaPropsMap = conf.getPropsMatchPrefixAndTrimPrefix(KAFKA_CONFIG_PREFIX); + Properties kafkaProps = new Properties(); + kafkaProps.putAll(kafkaPropsMap); + + this.kafkaClient = new KafkaClientWrapper(kafkaProps); + + // TODO: these constants should be read from config + long kafkaServiceInterval = 2 * 1000; + long kafkaServiceTimeout = 300 * 1000; + + LOG.info("Creating OMEventListenerLedgerPoller with serviceInterval={}," + + "serviceTimeout={}, kafkaProps={}, seekPosition={}", + kafkaServiceInterval, kafkaServiceTimeout, kafkaProps, + seekPosition); + + this.seekPosition = new OMEventListenerLedgerPollerSeekPosition(); + + this.ledgerPoller = new OMEventListenerLedgerPoller( + kafkaServiceInterval, TimeUnit.MILLISECONDS, + COMPLETED_REQUEST_CONSUMER_CORE_POOL_SIZE, + kafkaServiceTimeout, pluginContext, conf, + seekPosition, + this::handleCompletedRequest); + } + + @Override + public void start() { + ledgerPoller.start(); + + try { + kafkaClient.initialize(); + } catch (IOException ex) { + LOG.error("Failure initializing kafka client", ex); + } + } + + @Override + public void shutdown() { + try { + kafkaClient.shutdown(); + } catch (IOException ex) { + LOG.error("Failure shutting down kafka client", ex); + } + + ledgerPoller.shutdown(); + } + + // callback called by OMEventListenerLedgerPoller + public void handleCompletedRequest(OmCompletedRequestInfo completedRequestInfo) { + LOG.info("Processing {}", completedRequestInfo); + + // stub event until we implement a strategy to convert the events to + // a user facing schema (e.g. S3) + String event = String.format("{\"key\":\"%s/%s/%s\", \"type\":\"%s\"}", + completedRequestInfo.getVolumeName(), + completedRequestInfo.getBucketName(), + completedRequestInfo.getKeyName(), + String.valueOf(completedRequestInfo.getCmdType())); + + LOG.info("Sending {}", event); + + try { + kafkaClient.send(event); + } catch (IOException ex) { + LOG.error("Failure to send event {}", event, ex); + return; + } + + // we can update the seek position + seekPosition.set(String.valueOf(completedRequestInfo.getTrxLogIndex())); + } + + static class KafkaClientWrapper { + public static final Logger LOG = LoggerFactory.getLogger(KafkaClientWrapper.class); + + private final String topic; + private final Properties kafkaProps; + + private KafkaProducer producer; + + KafkaClientWrapper(Properties kafkaProps) { + this.topic = (String) kafkaProps.get("topic"); + this.kafkaProps = kafkaProps; + } + + public void initialize() throws IOException { + LOG.info("Initializing with properties {}", kafkaProps); + this.producer = new KafkaProducer<>(kafkaProps); + + ensureTopicExists(); + } + + public void shutdown() throws IOException { + producer.close(); + } + + public void send(String message) throws IOException { + if (producer != null) { + LOG.info("Producing event {}", message); + ProducerRecord producerRecord = + new ProducerRecord<>(topic, message); + producer.send(producerRecord); + } else { + LOG.warn("Producing event {} [KAFKA DOWN]", message); + } + } + + private void ensureTopicExists() { + try (AdminClient adminClient = AdminClient.create(kafkaProps)) { + LOG.info("Creating kafka topic: {}", this.topic); + NewTopic newTopic = new NewTopic(this.topic, 1, (short) 1); + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + adminClient.close(); + } catch (Exception ex) { + LOG.error("Failed to create topic: {}", this.topic, ex); + } + } + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java new file mode 100644 index 000000000000..a8148601650d --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPoller.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class which can be used by implementations of + * OMEventListener which uses a background service to read the latest + * completed operations and hand them to a callback method. + */ +public class OMEventListenerLedgerPoller extends BackgroundService { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPoller.class); + + private static final int MAX_RESULTS = 10_000; + + private final AtomicBoolean suspended; + private final AtomicLong runCount; + private final AtomicLong successRunCount; + private final OMEventListenerPluginContext pluginContext; + private final OMEventListenerLedgerPollerSeekPosition seekPosition; + private final Consumer callback; + + @SuppressWarnings("checkstyle:ParameterNumber") + public OMEventListenerLedgerPoller(long interval, TimeUnit unit, + int poolSize, long serviceTimeout, + OMEventListenerPluginContext pluginContext, + OzoneConfiguration configuration, + OMEventListenerLedgerPollerSeekPosition seekPosition, + Consumer callback) { + + super("OMEventListenerLedgerPoller", + interval, + TimeUnit.MILLISECONDS, + poolSize, + serviceTimeout, pluginContext.getThreadNamePrefix()); + + this.suspended = new AtomicBoolean(false); + this.runCount = new AtomicLong(0); + this.successRunCount = new AtomicLong(0); + this.pluginContext = pluginContext; + this.seekPosition = seekPosition; + this.callback = callback; + } + + private boolean shouldRun() { + return pluginContext.isLeaderReady() && !suspended.get(); + } + + /** + * Suspend the service. + */ + @VisibleForTesting + public void suspend() { + suspended.set(true); + } + + /** + * Resume the service if suspended. + */ + @VisibleForTesting + public void resume() { + suspended.set(false); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + queue.add(new OMEventListenerLedgerPoller.CompletedRequestInfoConsumerTask()); + return queue; + } + + public AtomicLong getRunCount() { + return runCount; + } + + private class CompletedRequestInfoConsumerTask implements BackgroundTask { + + @Override + public int getPriority() { + return 0; + } + + @Override + public BackgroundTaskResult call() { + if (shouldRun()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Running OMEventListenerLedgerPoller"); + } + if (runCount.get() == 0) { + seekPosition.initSeekPosition(); + } + getRunCount().incrementAndGet(); + + try { + for (OmCompletedRequestInfo requestInfo : pluginContext.listCompletedRequestInfo( + seekPosition.get(), MAX_RESULTS)) { + callback.accept(requestInfo); + } + successRunCount.incrementAndGet(); + } catch (IOException e) { + LOG.error("Error while running completed operation consumer " + + "background task. Will retry at next run.", e); + } + } else { + runCount.set(0); + } + + // place holder by returning empty results of this call back. + return BackgroundTaskResult.EmptyTaskResult.newResult(); + } + } + + public long getSuccessfulRunCount() { + return successRunCount.get(); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java new file mode 100644 index 000000000000..c93a22175393 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerLedgerPollerSeekPosition.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a helper class to get/set the seek position used by the + * OMEventListenerLedgerPoller. + * + * XXX: the seek position should be persisted (and ideally distrbuted to + * all OMs) but at the moment it only lives in memory + */ +public class OMEventListenerLedgerPollerSeekPosition { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerLedgerPollerSeekPosition.class); + + private final AtomicReference seekPosition; + + public OMEventListenerLedgerPollerSeekPosition() { + this.seekPosition = new AtomicReference(initSeekPosition()); + } + + // TODO: load this from persistent storage + public String initSeekPosition() { + return null; + } + + public String get() { + return seekPosition.get(); + } + + public void set(String val) { + LOG.debug("Setting seek position {}", val); + // NOTE: this in-memory view of the seek position needs to be kept + // up to date because the OMEventListenerLedgerPoller has a + // reference to it + seekPosition.set(val); + } +} diff --git a/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java new file mode 100644 index 000000000000..dbda5e337cce --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes for the OM Event Listener implementation. + */ +package org.apache.hadoop.ozone.om.eventlistener; diff --git a/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java new file mode 100644 index 000000000000..f35d9999f322 --- /dev/null +++ b/hadoop-ozone/ozone-manager-plugins/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerKafkaPublisher.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; +import org.apache.hadoop.util.Time; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerKafkaPublisher { + + private static final String VOLUME_NAME = "vol1"; + private static final String BUCKET_NAME = "bucket1"; + + @Mock + private OMEventListenerPluginContext pluginContext; + + // helper to create json key/val string for non exhaustive JSON + // attribute checking + private static String toJsonKeyVal(String key, String val) { + return new StringBuilder() + .append('\"') + .append(key) + .append('\"') + .append(':') + .append('\"') + .append(val) + .append('\"') + .toString(); + } + + private static OmCompletedRequestInfo buildCompletedRequestInfo( + long trxLogIndex, Type cmdType, String keyName, OperationArgs opArgs) { + + return new OmCompletedRequestInfo.Builder() + .setTrxLogIndex(trxLogIndex) + .setCmdType(cmdType) + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setCreationTime(Time.now()) + .setOpArgs(new OperationArgs.NoArgs()) + .build(); + } + + private List captureEventsProducedByOperation(OmCompletedRequestInfo op, int expectEvents) + throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.notify.kafka.topic", "abc"); + + List events = new ArrayList<>(); + + OMEventListenerKafkaPublisher plugin = new OMEventListenerKafkaPublisher(); + try (MockedConstruction mockeKafkaClientWrapper = + mockConstruction(OMEventListenerKafkaPublisher.KafkaClientWrapper.class)) { + + plugin.initialize(conf, pluginContext); + plugin.handleCompletedRequest(op); + + OMEventListenerKafkaPublisher.KafkaClientWrapper mock = mockeKafkaClientWrapper.constructed().get(0); + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + verify(mock, times(expectEvents)).send(argument.capture()); + + events.addAll(argument.getAllValues()); + } + + return events; + } + + @Test + public void testCreateKeyRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(1L, Type.CreateKey, "some/key1", + new OperationArgs.NoArgs()); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key1")) + .contains(toJsonKeyVal("type", "CreateKey")); + } + + @Test + public void testCreateFileRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + boolean recursive = false; + boolean overwrite = true; + + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(2L, Type.CreateFile, "some/key2", + new OperationArgs.CreateFileArgs(recursive, overwrite)); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key2")) + .contains(toJsonKeyVal("type", "CreateFile")); + } + + @Test + public void testCreateDirectoryRequestProducesS3CreatedEvent() throws InterruptedException, IOException { + OmCompletedRequestInfo createRequest = buildCompletedRequestInfo(3L, Type.CreateDirectory, "some/key3", + new OperationArgs.NoArgs()); + + List events = captureEventsProducedByOperation(createRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key3")) + .contains(toJsonKeyVal("type", "CreateDirectory")); + } + + @Test + public void testRenameRequestProducesS3CreateAndDeleteEvents() throws InterruptedException, IOException { + OmCompletedRequestInfo renameRequest = buildCompletedRequestInfo(4L, Type.RenameKey, "some/key4", + new OperationArgs.RenameKeyArgs("some/key_RENAMED")); + + List events = captureEventsProducedByOperation(renameRequest, 1); + assertThat(events).hasSize(1); + + assertThat(events.get(0)) + .contains(toJsonKeyVal("key", "vol1/bucket1/some/key4")) + .contains(toJsonKeyVal("type", "RenameKey")); + } +} diff --git a/hadoop-ozone/ozone-manager/pom.xml b/hadoop-ozone/ozone-manager/pom.xml index 83dacbe4eb35..2ef5a410ff0a 100644 --- a/hadoop-ozone/ozone-manager/pom.xml +++ b/hadoop-ozone/ozone-manager/pom.xml @@ -243,6 +243,11 @@ netty-tcnative-boringssl-static runtime + + org.apache.ozone + ozone-manager-plugins + runtime + diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java index df3af67bc3bd..3cc004137efc 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java @@ -1340,6 +1340,58 @@ private List listAllVolumes(String prefix, String startKey, return result; } + /** + * {@inheritDoc} + */ + @Override + public List listCompletedRequestInfo(final String startKey, + final int maxResults) + throws IOException { + List results = new ArrayList<>(); + + Table.KeyValue completedRequestInfoRow; + try (TableIterator> + tableIterator = getCompletedRequestInfoTable().iterator()) { + + boolean skipFirst = false; + if (StringUtils.isNotBlank(startKey)) { + // TODO: what happens if the seek position is no longer + // available? Do we go to the end of the list + // or the first key > startKey + tableIterator.seek(Long.valueOf(startKey)); + skipFirst = true; + } + + while (tableIterator.hasNext() && results.size() < maxResults) { + completedRequestInfoRow = tableIterator.next(); + // this is the first loop iteration after the seek so we + // need to skip the record we seeked to (if it is still + // present) + if (skipFirst) { + skipFirst = false; + // NOTE: I'm assuming that we need to conditionally do this + // only if it is equal to what we wanted to seek to (hence + if (!Objects.equals(startKey, completedRequestInfoRow.getKey())) { + // when we have a startKey we expect the first result + // to be that startKey. If it is not then we can infer that + // the startKey was already cleaned up and therefore we have + // missed some records somehow and this needs flagged to the + // caller. + // TODO: we should throw a custom exception here (instead of + // IOException) that needs to be handled appropriately by + // callers + throw new IOException( + "Missing rows - start key not found (startKey=" + startKey + + ", foundKey=" + completedRequestInfoRow.getKey() + ")"); + } + } else { + results.add(completedRequestInfoRow.getValue()); + } + } + } + return results; + } + private PersistedUserVolumeInfo getVolumesByUser(String userNameKey) throws OMException { try { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java new file mode 100644 index 000000000000..1c484a2d1720 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginContextImpl.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo; + +/** + * A narrow set of functionality we are ok with exposing to plugin + * implementations. + */ +public final class OMEventListenerPluginContextImpl implements OMEventListenerPluginContext { + private final OzoneManager ozoneManager; + + public OMEventListenerPluginContextImpl(OzoneManager ozoneManager) { + this.ozoneManager = ozoneManager; + } + + @Override + public boolean isLeaderReady() { + return ozoneManager.isLeaderReady(); + } + + // TODO: should we allow plugins to pass in maxResults or just limit + // them to some predefined value for safety? e.g. 10K + @Override + public List listCompletedRequestInfo(String startKey, int maxResults) throws IOException { + return ozoneManager.getMetadataManager().listCompletedRequestInfo(startKey, maxResults); + } + + // TODO: it feels like this doesn't belong here + @Override + public String getThreadNamePrefix() { + return ozoneManager.getThreadNamePrefix(); + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java new file mode 100644 index 000000000000..e4331206b056 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/OMEventListenerPluginManager.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a manager for plugins which implement OMEventListener which + * manages the lifecycle of constructing starting/stopping configured + * plugins. + */ +public class OMEventListenerPluginManager { + public static final Logger LOG = LoggerFactory.getLogger(OMEventListenerPluginManager.class); + + public static final String PLUGIN_DEST_BASE = "ozone.om.plugin.destination"; + + private final List plugins; + + public OMEventListenerPluginManager(OzoneManager ozoneManager, OzoneConfiguration conf) { + this.plugins = loadAll(ozoneManager, conf); + } + + public List getLoaded() { + return plugins; + } + + public void startAll() { + for (OMEventListener plugin : plugins) { + plugin.start(); + } + } + + public void shutdownAll() { + for (OMEventListener plugin : plugins) { + plugin.shutdown(); + } + } + + // Configuration is based on ranger plugins + // + // For example, a plugin named FooPlugin would be configured via + // OzoneConfiguration properties as follows: + // + // conf.set("ozone.om.plugin.destination.foo", "enabled"); + // conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + // + static List loadAll(OzoneManager ozoneManager, OzoneConfiguration conf) { + List plugins = new ArrayList<>(); + + Map props = conf.getPropsMatchPrefixAndTrimPrefix(PLUGIN_DEST_BASE); + List destNameList = new ArrayList<>(); + for (Map.Entry entry : props.entrySet()) { + String destName = entry.getKey(); + String value = entry.getValue(); + LOG.info("Found event listener plugin with name={} and value={}", destName, value); + + if (value.equalsIgnoreCase("enable") || value.equalsIgnoreCase("enabled") || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Event listener plugin {}{} is set to {}", PLUGIN_DEST_BASE, destName, value); + } + } + + OMEventListenerPluginContext pluginContext = new OMEventListenerPluginContextImpl(ozoneManager); + + for (String destName : destNameList) { + try { + Class cls = resolvePluginClass(conf, destName); + LOG.info("Event listener plugin class is {}", cls); + + OMEventListener impl = cls.newInstance(); + impl.initialize(conf, pluginContext); + + plugins.add(impl); + } catch (Exception ex) { + LOG.error("Can't make instance of event listener plugin {}{}", PLUGIN_DEST_BASE, destName, ex); + } + } + + return plugins; + } + + private static Class resolvePluginClass(OzoneConfiguration conf, + String destName) { + String classnameProp = PLUGIN_DEST_BASE + destName + ".classname"; + LOG.info("Gettting classname for {} with propety {}", destName, classnameProp); + Class cls = conf.getClass(classnameProp, null, OMEventListener.class); + if (null == cls) { + throw new RuntimeException(String.format( + "Unable to load plugin %s, classname property %s is missing or does not implement OMEventListener", + destName, classnameProp)); + } + return cls; + } +} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java new file mode 100644 index 000000000000..dbda5e337cce --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/eventlistener/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains classes for the OM Event Listener implementation. + */ +package org.apache.hadoop.ozone.om.eventlistener; diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java new file mode 100644 index 000000000000..e58c588353e2 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/BarPlugin.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +/** + * Test utility for creating a dummy plugin. + */ +public class BarPlugin implements OMEventListener { + + private boolean initialized = false; + private boolean started = false; + private boolean shutdown = false; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + initialized = true; + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + shutdown = true; + } + + public boolean isInitialized() { + return initialized; + } + + public boolean isStarted() { + return started; + } + + public boolean isShutdown() { + return shutdown; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java new file mode 100644 index 000000000000..e919eda603af --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/FooPlugin.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; + +/** + * Test utility for creating a dummy plugin. + */ +public class FooPlugin implements OMEventListener { + + private boolean initialized = false; + private boolean started = false; + private boolean shutdown = false; + + @Override + public void initialize(OzoneConfiguration conf, OMEventListenerPluginContext pluginContext) { + initialized = true; + } + + @Override + public void start() { + started = true; + } + + @Override + public void shutdown() { + shutdown = true; + } + + public boolean isInitialized() { + return initialized; + } + + public boolean isStarted() { + return started; + } + + public boolean isShutdown() { + return shutdown; + } +} diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java new file mode 100644 index 000000000000..70913c6e8201 --- /dev/null +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/eventlistener/TestOMEventListenerPluginManager.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om.eventlistener; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +/** + * Tests {@link OMEventListenerPluginManager}. + */ +@ExtendWith(MockitoExtension.class) +public class TestOMEventListenerPluginManager { + + @Mock + private OzoneManager ozoneManager; + + static List getLoadedPlugins(OMEventListenerPluginManager pluginManager) { + List loadedClasses = new ArrayList<>(); + for (OMEventListener plugin : pluginManager.getLoaded()) { + loadedClasses.add(plugin.getClass().getName()); + } + + // normalize + Collections.sort(loadedClasses); + + return loadedClasses; + } + + private static class BrokenFooPlugin { + + } + + @Test + public void testLoadSinglePlugin() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList("org.apache.hadoop.ozone.om.eventlistener.FooPlugin"), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testLoadMultiplePlugins() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"); + conf.set("ozone.om.plugin.destination.bar", "enabled"); + conf.set("ozone.om.plugin.destination.bar.classname", "org.apache.hadoop.ozone.om.eventlistener.BarPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList("org.apache.hadoop.ozone.om.eventlistener.BarPlugin", + "org.apache.hadoop.ozone.om.eventlistener.FooPlugin"), + + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginMissingClassname() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginClassDoesNotExist() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.NotExistingPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } + + @Test + public void testPluginClassDoesNotImplementInterface() throws InterruptedException { + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set("ozone.om.plugin.destination.foo", "enabled"); + conf.set("ozone.om.plugin.destination.foo.classname", "org.apache.hadoop.ozone.om.eventlistener.BrokenFooPlugin"); + + OMEventListenerPluginManager pluginManager = new OMEventListenerPluginManager(ozoneManager, conf); + + Assertions.assertEquals(Arrays.asList(), + getLoadedPlugins(pluginManager)); + } +} diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 417f05064efd..4b42e0754177 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -46,6 +46,7 @@ mini-cluster multitenancy-ranger ozone-manager + ozone-manager-plugins ozonefs ozonefs-common recon diff --git a/pom.xml b/pom.xml index 7db5ff9b3189..098e99c32733 100644 --- a/pom.xml +++ b/pom.xml @@ -132,6 +132,7 @@ 2.1 1.1.1 5.14.2 + 3.9.2 1.0.1 1.9.25 2.7.1 @@ -995,6 +996,11 @@ httpcore-nio ${httpcore.version} + + org.apache.kafka + kafka-clients + ${kafka.version} + org.apache.kerby kerb-core @@ -1272,6 +1278,11 @@ ${ozone.version} test-jar + + org.apache.ozone + ozone-manager-plugins + ${ozone.version} + org.apache.ozone ozone-mini-cluster @@ -2144,6 +2155,7 @@ --> com.fasterxml.jackson.core:jackson-databind:jar org.apache.commons:commons-compress:jar + org.apache.hadoop:hadoop-common:jar org.apache.ozone:hdds-client:jar org.apache.ozone:ozone-interface-client:jar org.glassfish.jersey.core:jersey-common:jar