Skip to content

Commit

Permalink
GH-2821: Support for abort and timeouts on updates.
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Nov 17, 2024
1 parent cf96347 commit b276f97
Show file tree
Hide file tree
Showing 35 changed files with 964 additions and 194 deletions.
14 changes: 14 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/http/HttpLib.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,21 @@ public static void handleResponseNoBody(HttpResponse<InputStream> response) {
* @return String
*/
public static String handleResponseRtnString(HttpResponse<InputStream> response) {
return handleResponseRtnString(response, null);
}

/**
* Handle the HTTP response and read the body to produce a string if a 200.
* Otherwise, throw an {@link HttpException}.
* @param response
* @param callback A callback that receives the opened input stream.
* @return String
*/
public static String handleResponseRtnString(HttpResponse<InputStream> response, Consumer<InputStream> callback) {
InputStream input = handleResponseInputStream(response);
if (callback != null) {
callback.accept(input);
}
try {
return IO.readWholeFileAsUTF8(input);
} catch (RuntimeIOException e) { throw new HttpException(e); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,31 +280,20 @@ public Y httpHeaders(Map<String, String> headers) {
public Y context(Context context) {
if ( context == null )
return thisBuilder();
ensureContext();
contextAcc.context(context);
//this.context.putAll(context);
return thisBuilder();
}

public Y set(Symbol symbol, Object value) {
ensureContext();
contextAcc.set(symbol, value);
//context.set(symbol, value);
return thisBuilder();
}

public Y set(Symbol symbol, boolean value) {
ensureContext();
contextAcc.set(symbol, value);
//context.set(symbol, value);
return thisBuilder();
}

private void ensureContext() {
// if ( context == null )
// context = new Context();
}

/**
* Set a timeout of the overall operation.
* Time-to-connect can be set with a custom {@link HttpClient} - see {@link java.net.http.HttpClient.Builder#connectTimeout(java.time.Duration)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.net.http.HttpClient;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.jena.graph.Node;
Expand All @@ -31,6 +32,7 @@
import org.apache.jena.sparql.exec.http.UpdateSendMode;
import org.apache.jena.sparql.syntax.syntaxtransform.UpdateTransformOps;
import org.apache.jena.sparql.util.Context;
import org.apache.jena.sparql.util.ContextAccumulator;
import org.apache.jena.sparql.util.Symbol;
import org.apache.jena.sys.JenaSystem;
import org.apache.jena.update.Update;
Expand All @@ -56,8 +58,9 @@ public String toString() {

/** Accumulator for update elements. Can build an overall string or UpdateRequest from the elements. */
private class UpdateEltAcc implements Iterable<UpdateElt> {
/** Delimiter for joining multiple SPARQL update strings into a single one. */
public static final String DELIMITER = ";\n";
/** Delimiter for joining multiple SPARQL update strings into a single one.
* The delimiter takes into account that the last line of a statement may be a single-line-comment. */
public static final String DELIMITER = "\n;\n";

private List<UpdateElt> updateOperations = new ArrayList<>();
private List<UpdateElt> updateOperationsView = Collections.unmodifiableList(updateOperations);
Expand All @@ -76,6 +79,7 @@ public void add(Update update) {
add(new UpdateElt(update));
}

/** Add a string by parsing it. */
public void add(String updateRequestString) {
UpdateRequest updateRequest = UpdateFactory.create(updateRequestString);
add(updateRequest);
Expand Down Expand Up @@ -146,9 +150,11 @@ public String buildString() {
protected UpdateSendMode sendMode = UpdateSendMode.systemDefault;
protected List<String> usingGraphURIs = null;
protected List<String> usingNamedGraphURIs = null;
protected Context context = null;
private ContextAccumulator contextAcc = ContextAccumulator.newBuilder(()->ARQ.getContext());
// Uses query rewrite to replace variables by values.
protected Map<Var, Node> substitutionMap = new HashMap<>();
protected long timeout = -1;
protected TimeUnit timeoutUnit = null;

protected ExecUpdateHTTPBuilder() {}

Expand Down Expand Up @@ -213,6 +219,12 @@ public Y substitution(Var var, Node value) {
return thisBuilder();
}

public Y timeout(long timeout, TimeUnit timeoutUnit) {
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
return thisBuilder();
}

public Y httpClient(HttpClient httpClient) {
this.httpClient = Objects.requireNonNull(httpClient);
return thisBuilder();
Expand Down Expand Up @@ -274,28 +286,20 @@ public Y httpHeaders(Map<String, String> headers) {
public Y context(Context context) {
if ( context == null )
return thisBuilder();
ensureContext();
this.context.setAll(context);
this.contextAcc.context(context);
return thisBuilder();
}

public Y set(Symbol symbol, Object value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return thisBuilder();
}

public Y set(Symbol symbol, boolean value) {
ensureContext();
this.context.set(symbol, value);
this.contextAcc.set(symbol, value);
return thisBuilder();
}

private void ensureContext() {
if ( context == null )
context = Context.create();
}

public X build() {
Objects.requireNonNull(serviceURL, "No service URL");
if ( updateEltAcc.isEmpty() )
Expand All @@ -322,7 +326,7 @@ public X build() {
// If the UpdateRequest object wasn't built until now then build the string instead.
String updateStringActual = updateActual == null ? updateEltAcc.buildString() : null;

Context cxt = (context!=null) ? context : ARQ.getContext().copy();
Context cxt = contextAcc.context();
return buildX(hClient, updateActual, updateStringActual, cxt);
}

Expand Down
11 changes: 11 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/query/ARQ.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ public static void enableBlankNodeResultLabels(boolean val) {
*/
public static final Symbol queryTimeout = SystemARQ.allocSymbol("queryTimeout");

/**
* Set timeout. The value of this symbol gives the value of the timeout in milliseconds
* <ul>
* <li>A Number; the long value is used</li>
* <li>A string, e.g. "1000", parsed as a number</li>
* <li>A string, as two numbers separated by a comma, e.g. "500,10000" parsed as two numbers</li>
* </ul>
* @see org.apache.jena.update.UpdateExecutionBuilder#timeout(long, TimeUnit)
*/
public static final Symbol updateTimeout = SystemARQ.allocSymbol("updateTimeout");

// This can't be a context constant because NodeValues don't look in the context.
// /**
// * Context symbol controlling Roman Numerals in Filters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.graph.Graph;
import org.apache.jena.query.ARQ;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.engine.main.OpExecutor;
import org.apache.jena.sparql.engine.main.OpExecutorFactory;
Expand Down Expand Up @@ -78,18 +76,7 @@ public ExecutionContext(DatasetGraph dataset, OpExecutorFactory factory) {
}

public ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory) {
this(params, activeGraph, dataset, factory, cancellationSignal(params));
}

private static AtomicBoolean cancellationSignal(Context cxt) {
if ( cxt == null )
return null;
try {
return cxt.get(ARQConstants.symCancelQuery);
} catch (ClassCastException ex) {
Log.error(ExecutionContext.class, "Class cast exception: Expected AtomicBoolean for cancel control: "+ex.getMessage());
return null;
}
this(params, activeGraph, dataset, factory, Context.getCancelSignal(params));
}

private ExecutionContext(Context params, Graph activeGraph, DatasetGraph dataset, OpExecutorFactory factory, AtomicBoolean cancelSignal) {
Expand Down
Loading

0 comments on commit b276f97

Please sign in to comment.