Skip to content

Commit

Permalink
Diagnostic API and service for Inputs (#20958)
Browse files Browse the repository at this point in the history
* API skeleton

* extract results and refactor

* adjust tests

* improve error handling
  • Loading branch information
patrickmann authored Nov 26, 2024
1 parent ab51ce8 commit c82f761
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.inputs;

import com.google.common.collect.ImmutableSet;
import jakarta.inject.Inject;
import jakarta.ws.rs.InternalServerErrorException;
import org.apache.commons.lang3.StringUtils;
import org.graylog.plugins.views.search.Query;
import org.graylog.plugins.views.search.QueryResult;
import org.graylog.plugins.views.search.Search;
import org.graylog.plugins.views.search.SearchJob;
import org.graylog.plugins.views.search.SearchType;
import org.graylog.plugins.views.search.elasticsearch.ElasticsearchQueryString;
import org.graylog.plugins.views.search.engine.SearchExecutor;
import org.graylog.plugins.views.search.errors.SearchError;
import org.graylog.plugins.views.search.permissions.SearchUser;
import org.graylog.plugins.views.search.rest.ExecutionState;
import org.graylog.plugins.views.search.rest.SearchJobDTO;
import org.graylog.plugins.views.search.searchtypes.pivot.Pivot;
import org.graylog.plugins.views.search.searchtypes.pivot.PivotResult;
import org.graylog.plugins.views.search.searchtypes.pivot.buckets.Values;
import org.graylog.plugins.views.search.searchtypes.pivot.series.Count;
import org.graylog2.database.NotFoundException;
import org.graylog2.plugin.indexer.searches.timeranges.RelativeRange;
import org.graylog2.rest.models.system.inputs.responses.InputDiagnostics;
import org.graylog2.streams.StreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.graylog2.plugin.Message.FIELD_GL2_SOURCE_INPUT;
import static org.graylog2.rest.models.system.inputs.responses.InputDiagnostics.EMPTY_DIAGNOSTICS;
import static org.graylog2.shared.utilities.StringUtils.f;

public class InputDiagnosticService {
private static final Logger LOG = LoggerFactory.getLogger(InputDiagnosticService.class);

private static final String QUERY_ID = "input_diagnostics_streams_query";
private static final String PIVOT_ID = "input_diagnostics_streams_pivot";

private final SearchExecutor searchExecutor;
private final StreamService streamService;

@Inject
public InputDiagnosticService(SearchExecutor searchExecutor,
StreamService streamService) {
this.searchExecutor = searchExecutor;
this.streamService = streamService;
}

public InputDiagnostics getInputDiagnostics(
Input input, SearchUser searchUser) {
final Search search = buildSearch(input);
final SearchJob searchJob = searchExecutor.executeSync(search, searchUser, ExecutionState.empty());
final SearchJobDTO searchJobDTO = SearchJobDTO.fromSearchJob(searchJob);
final QueryResult queryResult = searchJobDTO.results().get(QUERY_ID);

final Set<SearchError> errors = queryResult.errors();
if (errors != null && !errors.isEmpty()) {
String errorMsg = f("An error occurred while executing aggregation: %s",
errors.stream().map(SearchError::description).collect(Collectors.joining(", ")));
LOG.error(errorMsg);
throw new InternalServerErrorException(errorMsg);
}

final SearchType.Result aggregationResult = queryResult.searchTypes().get(PIVOT_ID);
if (aggregationResult instanceof PivotResult pivotResult && pivotResult.total() > 0) {
final List<AbstractMap.SimpleEntry<String, Long>> resultList = pivotResult.rows().stream()
.filter(row -> row.source().equals("leaf"))
.map(InputDiagnosticService::extractValues)
.toList();
Map<String, Long> resultMap = new HashMap<>();
resultList.forEach(
entry -> {
try {
final org.graylog2.plugin.streams.Stream stream = streamService.load(entry.getKey());
resultMap.put(stream.getTitle(), entry.getValue());
} catch (NotFoundException e) {
LOG.warn("Unable to load stream {}", entry.getKey(), e);
}
}
);
return new InputDiagnostics(resultMap);
}

return EMPTY_DIAGNOSTICS;
}

private Search buildSearch(Input input) {
final SearchType searchType = Pivot.builder()
.id(PIVOT_ID)
.rollup(true)
.rowGroups(Values.builder().fields(List.of("streams")).build())
.series(Count.builder().build())
.build();
return Search.builder()
.queries(ImmutableSet.of(
Query.builder()
.id(QUERY_ID)
.query(ElasticsearchQueryString.of(FIELD_GL2_SOURCE_INPUT + ":" + input.getId()))
.searchTypes(Collections.singleton(searchType))
.timerange(RelativeRange.create(900))
.build()
))
.build();
}

private static AbstractMap.SimpleEntry<String, Long> extractValues(PivotResult.Row r) {
if (r.values().size() != 1) {
String errorMsg = f("Expected 1 value in aggregation result, but received [%d].", r.values().size());
LOG.warn(errorMsg);
throw new InternalServerErrorException(errorMsg);
}
final String streamId = r.key().get(0);
if (StringUtils.isEmpty(streamId)) {
String errorMsg = "Unable to retrieve stream ID from query result";
LOG.warn(errorMsg);
throw new InternalServerErrorException(errorMsg);
}

final Long count = (Long) r.values().get(0).value();
return new AbstractMap.SimpleEntry<>(streamId, count);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.rest.models.system.inputs.responses;

import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.Map;

public record InputDiagnostics(
@JsonProperty("stream_message_count") Map<String, Long> streamMessageCount) {
public static InputDiagnostics EMPTY_DIAGNOSTICS = new InputDiagnostics(Collections.emptyMap());
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,38 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import jakarta.inject.Inject;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.graylog.plugins.views.search.permissions.SearchUser;
import org.graylog2.Configuration;
import org.graylog2.audit.AuditEventTypes;
import org.graylog2.audit.jersey.AuditEvent;
import org.graylog2.inputs.Input;
import org.graylog2.inputs.InputDiagnosticService;
import org.graylog2.inputs.InputService;
import org.graylog2.inputs.encryption.EncryptedInputConfigs;
import org.graylog2.plugin.configuration.ConfigurationException;
import org.graylog2.plugin.database.ValidationException;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.rest.models.system.inputs.requests.InputCreateRequest;
import org.graylog2.rest.models.system.inputs.responses.InputCreated;
import org.graylog2.rest.models.system.inputs.responses.InputDiagnostics;
import org.graylog2.rest.models.system.inputs.responses.InputSummary;
import org.graylog2.rest.models.system.inputs.responses.InputsList;
import org.graylog2.shared.inputs.MessageInputFactory;
Expand All @@ -43,24 +62,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.PUT;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;

import java.net.URI;
import java.util.HashMap;
import java.util.Locale;
Expand All @@ -81,13 +82,15 @@ public class InputsResource extends AbstractInputsResource {
private static final Logger LOG = LoggerFactory.getLogger(InputsResource.class);

private final InputService inputService;
private final InputDiagnosticService inputDiagnosticService;
private final MessageInputFactory messageInputFactory;
private final Configuration config;

@Inject
public InputsResource(InputService inputService, MessageInputFactory messageInputFactory, Configuration config) {
public InputsResource(InputService inputService, InputDiagnosticService inputDiagnosticService, MessageInputFactory messageInputFactory, Configuration config) {
super(messageInputFactory.getAvailableInputs());
this.inputService = inputService;
this.inputDiagnosticService = inputDiagnosticService;
this.messageInputFactory = messageInputFactory;
this.config = config;
}
Expand All @@ -108,6 +111,20 @@ public InputSummary get(@ApiParam(name = "inputId", required = true)
return getInputSummary(input);
}

@GET
@Timed
@ApiOperation(value = "Get diagnostic information of a single input")
@Path("/diagnostics/{inputId}")
@ApiResponses(value = {
@ApiResponse(code = 404, message = "No such input.")
})
public InputDiagnostics diagnostics(@ApiParam(name = "inputId", required = true)
@PathParam("inputId") String inputId,
@Context SearchUser searchUser) throws org.graylog2.database.NotFoundException {
checkPermission(RestPermissions.INPUTS_READ, inputId);
return inputDiagnosticService.getInputDiagnostics(inputService.find(inputId), searchUser);
}

@GET
@Timed
@ApiOperation(value = "Get all inputs")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.graylog2.Configuration;
import org.graylog2.database.NotFoundException;
import org.graylog2.inputs.Input;
import org.graylog2.inputs.InputDiagnosticService;
import org.graylog2.inputs.InputService;
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.configuration.fields.ConfigurationField;
Expand Down Expand Up @@ -67,7 +68,7 @@ public class InputsResourceMaskingPasswordsTest {

class InputsTestResource extends InputsResource {
public InputsTestResource(InputService inputService, MessageInputFactory messageInputFactory) {
super(inputService, messageInputFactory, new Configuration());
super(inputService, mock(InputDiagnosticService.class), messageInputFactory, new Configuration());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import jakarta.ws.rs.BadRequestException;
import org.graylog2.Configuration;
import org.graylog2.configuration.HttpConfiguration;
import org.graylog2.inputs.InputDiagnosticService;
import org.graylog2.inputs.InputService;
import org.graylog2.plugin.database.users.User;
import org.graylog2.plugin.inputs.MessageInput;
Expand Down Expand Up @@ -116,7 +117,7 @@ static class InputsTestResource extends InputsResource {
public InputsTestResource(InputService inputService,
MessageInputFactory messageInputFactory,
Configuration config) {
super(inputService, messageInputFactory, config);
super(inputService, mock(InputDiagnosticService.class), messageInputFactory, config);
configuration = mock(HttpConfiguration.class);
this.user = mock(User.class);
lenient().when(user.getName()).thenReturn("foo");
Expand Down

0 comments on commit c82f761

Please sign in to comment.