Skip to content

Commit

Permalink
Added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xunyin8 committed Nov 25, 2024
1 parent 6b541a1 commit 60f9774
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.linkedin.venice.utils.IteratorUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.BytesWritable;
Expand Down Expand Up @@ -174,4 +176,22 @@ private long getTotalIncomingDataSizeInBytes(JobConf jobConfig) {
protected void setHadoopJobClientProvider(HadoopJobClientProvider hadoopJobClientProvider) {
this.hadoopJobClientProvider = hadoopJobClientProvider;
}

// Visible for testing
@Override
protected AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter(
VeniceWriterFactory factory,
VeniceWriter<byte[], byte[], byte[]> mainWriter,
String flatViewConfigMapString,
String topicName,
boolean chunkingEnabled,
boolean rmdChunkingEnabled) {
return super.createCompositeVeniceWriter(
factory,
mainWriter,
flatViewConfigMapString,
topicName,
chunkingEnabled,
rmdChunkingEnabled);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.views.ViewUtils;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.CompositeVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import com.linkedin.venice.writer.update.CompositeVeniceWriter;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -381,7 +381,8 @@ private AbstractVeniceWriter<byte[], byte[], byte[]> createBasicVeniceWriter() {
}
}

AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter(
// protected and package private for testing purposes
protected AbstractVeniceWriter<byte[], byte[], byte[]> createCompositeVeniceWriter(
VeniceWriterFactory factory,
VeniceWriter<byte[], byte[], byte[]> mainWriter,
String flatViewConfigMapString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.exceptions.RecordTooLargeException;
import com.linkedin.venice.exceptions.VeniceException;
Expand All @@ -30,20 +31,31 @@
import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.ViewConfig;
import com.linkedin.venice.meta.ViewConfigImpl;
import com.linkedin.venice.meta.ViewParameters;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.pubsub.adapter.SimplePubSubProduceResultImpl;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.views.MaterializedView;
import com.linkedin.venice.views.VeniceView;
import com.linkedin.venice.views.ViewUtils;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -618,6 +630,51 @@ public void close() throws IOException {
Assert.assertThrows(VeniceException.class, () -> reducer.close());
}

@Test
public void testCreateCompositeVeniceWriter() throws JsonProcessingException {
VeniceReducer reducer = new VeniceReducer();
VeniceWriterFactory writerFactory = mock(VeniceWriterFactory.class);
VeniceWriter<byte[], byte[], byte[]> mainWriter = mock(VeniceWriter.class);
Map<String, ViewConfig> viewConfigMap = new HashMap<>();
String view1Name = "view1";
ViewParameters.Builder builder = new ViewParameters.Builder(view1Name);
builder.setPartitionCount(6);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig1 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
viewConfigMap.put(view1Name, viewConfig1);
String view2Name = "view2";
builder = new ViewParameters.Builder(view2Name);
builder.setPartitionCount(12);
builder.setPartitioner(DefaultVenicePartitioner.class.getCanonicalName());
ViewConfigImpl viewConfig2 = new ViewConfigImpl(MaterializedView.class.getCanonicalName(), builder.build());
viewConfigMap.put(view2Name, viewConfig2);
String flatViewConfigMapString = ViewUtils.flatViewConfigMapString(viewConfigMap);
String topicName = "test_v1";
reducer.createCompositeVeniceWriter(writerFactory, mainWriter, flatViewConfigMapString, topicName, true, true);
ArgumentCaptor<VeniceWriterOptions> vwOptionsCaptor = ArgumentCaptor.forClass(VeniceWriterOptions.class);
verify(writerFactory, times(2)).createVeniceWriter(vwOptionsCaptor.capture());
Map<Integer, VeniceView> verifyPartitionToViewsMap = new HashMap<>();
verifyPartitionToViewsMap.put(
6,
ViewUtils
.getVeniceView(viewConfig1.getViewClassName(), new Properties(), "test", viewConfig1.getViewParameters()));
verifyPartitionToViewsMap.put(
12,
ViewUtils
.getVeniceView(viewConfig2.getViewClassName(), new Properties(), "test", viewConfig2.getViewParameters()));
for (VeniceWriterOptions options: vwOptionsCaptor.getAllValues()) {
int partitionCount = options.getPartitionCount();
Assert.assertTrue(verifyPartitionToViewsMap.containsKey(partitionCount));
VeniceView veniceView = verifyPartitionToViewsMap.get(partitionCount);
Assert.assertTrue(veniceView instanceof MaterializedView);
MaterializedView materializedView = (MaterializedView) veniceView;
Assert.assertEquals(
options.getTopicName(),
materializedView.getTopicNamesAndConfigsForVersion(1).keySet().stream().findAny().get());
Assert.assertTrue(materializedView.getViewPartitioner() instanceof DefaultVenicePartitioner);
}
}

private Reporter createZeroCountReporterMock() {
Reporter mockReporter = mock(Reporter.class);
Counters.Counter mockCounters = mock(Counters.Counter.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package com.linkedin.venice.writer.update;
package com.linkedin.venice.writer;

import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.writer.AbstractVeniceWriter;
import com.linkedin.venice.writer.DeleteMetadata;
import com.linkedin.venice.writer.PutMetadata;
import com.linkedin.venice.writer.VeniceWriter;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.linkedin.venice.writer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import java.util.concurrent.CompletableFuture;
import org.testng.Assert;
import org.testng.annotations.Test;


public class CompositeVeniceWriterTest {
@Test
public void testFlushChecksForLastWriteFuture() {
VeniceWriter<byte[], byte[], byte[]> mockWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> mainWriterFuture = new CompletableFuture<>();
doReturn(mainWriterFuture).when(mockWriter).put(any(), any(), anyInt(), eq(null));
mainWriterFuture.completeExceptionally(new VeniceException("Expected exception"));
AbstractVeniceWriter<byte[], byte[], byte[]> compositeVeniceWriter =
new CompositeVeniceWriter("test_v1", mockWriter, new VeniceWriter[0], null);
compositeVeniceWriter.put(new byte[1], new byte[1], 1, null);
VeniceException e = Assert.expectThrows(VeniceException.class, compositeVeniceWriter::flush);
Assert.assertTrue(e.getCause().getMessage().contains("Expected"));
}

@Test
public void testWritesAreInOrder() throws InterruptedException {
VeniceWriter<byte[], byte[], byte[]> mockMainWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> mainWriterFuture = CompletableFuture.completedFuture(null);
doReturn(mainWriterFuture).when(mockMainWriter).put(any(), any(), anyInt(), eq(null));
VeniceWriter<byte[], byte[], byte[]> mockChildWriter = mock(VeniceWriter.class);
CompletableFuture<PubSubProduceResult> childWriterFuture = new CompletableFuture<>();
doReturn(childWriterFuture).when(mockChildWriter).put(any(), any(), anyInt(), eq(null));
VeniceWriter[] childWriters = new VeniceWriter[1];
childWriters[0] = mockChildWriter;
AbstractVeniceWriter<byte[], byte[], byte[]> compositeVeniceWriter =
new CompositeVeniceWriter<byte[], byte[], byte[]>("test_v1", mockMainWriter, childWriters, null);
compositeVeniceWriter.put(new byte[1], new byte[1], 1, null);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
Thread.sleep(1000);
verify(mockMainWriter, never()).put(any(), any(), anyInt(), eq(null));
childWriterFuture.complete(null);
verify(mockMainWriter, timeout(1000)).put(any(), any(), anyInt(), eq(null));
}
}

0 comments on commit 60f9774

Please sign in to comment.