Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.spiffe.workloadapi.grpc.Workload;
import io.spiffe.workloadapi.retry.RetryHandler;

import java.util.EnumSet;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -20,8 +22,15 @@ final class StreamObservers {
private static final Logger log =
Logger.getLogger(StreamObservers.class.getName());

private static final String INVALID_ARGUMENT = "INVALID_ARGUMENT";
private static final String STREAM_IS_COMPLETED = "Workload API stream is completed";
// Retry only transient stream failures. Caller/client cancellation and terminal API errors fail the watch closed.
private static final Set<Status.Code> NON_RETRYABLE_CODES = EnumSet.of(
Status.Code.INVALID_ARGUMENT,
Status.Code.CANCELLED,
Status.Code.PERMISSION_DENIED,
Status.Code.UNAUTHENTICATED,
Status.Code.UNIMPLEMENTED,
Status.Code.FAILED_PRECONDITION);

private StreamObservers() {
}
Expand Down Expand Up @@ -63,13 +72,15 @@ private void handleWatchX509ContextError(final Throwable t) {
private void handleX509ContextRetry(Throwable t) {
if (retryHandler.shouldRetry()) {
log.log(Level.FINE, "Retrying connecting to Workload API to register X.509 context watcher");
retryHandler.scheduleRetry(() ->
boolean retryScheduled = retryHandler.scheduleRetry(() ->
cancellableContext.run(
() -> workloadApiAsyncStub.fetchX509SVID(newX509SvidRequest(),
this)));
} else {
watcher.onError(new X509ContextException("Cancelling X.509 Context watch", t));
if (retryScheduled) {
return;
}
}
watcher.onError(new X509ContextException("Cancelling X.509 Context watch", t));
}

@Override
Expand Down Expand Up @@ -117,13 +128,15 @@ private void handleWatchX509BundlesError(final Throwable t) {
private void handleX509BundlesRetry(Throwable t) {
if (retryHandler.shouldRetry()) {
log.log(Level.FINE, "Retrying connecting to Workload API to register X.509 bundles watcher");
retryHandler.scheduleRetry(() ->
boolean retryScheduled = retryHandler.scheduleRetry(() ->
cancellableContext.run(
() -> workloadApiAsyncStub.fetchX509Bundles(newX509BundlesRequest(),
this)));
} else {
watcher.onError(new X509BundleException("Cancelling X.509 bundles watch", t));
if (retryScheduled) {
return;
}
}
watcher.onError(new X509BundleException("Cancelling X.509 bundles watch", t));
}

@Override
Expand Down Expand Up @@ -171,12 +184,14 @@ private void handleWatchJwtBundleError(final Throwable t) {
private void handleJwtBundleRetry(Throwable t) {
if (retryHandler.shouldRetry()) {
log.log(Level.FINE, "Retrying connecting to Workload API to register JWT Bundles watcher");
retryHandler.scheduleRetry(() ->
boolean retryScheduled = retryHandler.scheduleRetry(() ->
cancellableContext.run(() -> workloadApiAsyncStub.fetchJWTBundles(newJwtBundlesRequest(),
this)));
} else {
watcher.onError(new JwtBundleException("Cancelling JWT Bundles watch", t));
if (retryScheduled) {
return;
}
}
watcher.onError(new JwtBundleException("Cancelling JWT Bundles watch", t));
}

@Override
Expand All @@ -188,7 +203,7 @@ public void onCompleted() {
}

private static boolean isErrorNotRetryable(Throwable t) {
return INVALID_ARGUMENT.equals(Status.fromThrowable(t).getCode().name());
return NON_RETRYABLE_CODES.contains(Status.fromThrowable(t).getCode());
}

private static Workload.X509SVIDRequest newX509SvidRequest() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.spiffe.workloadapi.retry;

import java.time.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -26,18 +27,26 @@ public RetryHandler(final ExponentialBackoffPolicy exponentialBackoffPolicy, fin
* Updates the next delay and retries count.
*
* @param runnable the task to be scheduled for execution
* @return true if the retry was scheduled, false otherwise
*/
public void scheduleRetry(final Runnable runnable) {
public boolean scheduleRetry(final Runnable runnable) {
if (executor.isShutdown()) {
return;
return false;
}

if (exponentialBackoffPolicy.reachedMaxRetries(retryCount)) {
return;
return false;
}
executor.schedule(runnable, nextDelay.getSeconds(), TimeUnit.SECONDS);

try {
executor.schedule(runnable, nextDelay.getSeconds(), TimeUnit.SECONDS);
} catch (RejectedExecutionException e) {
return false;
}

nextDelay = exponentialBackoffPolicy.nextDelay(nextDelay);
retryCount++;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.spiffe.workloadapi.retry;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
Expand All @@ -11,16 +12,25 @@
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.*;

class RetryHandlerTest {

@Mock
ScheduledExecutorService scheduledExecutorService;

private AutoCloseable mocks;

@BeforeEach
void setup() {
MockitoAnnotations.initMocks(this);
mocks = MockitoAnnotations.openMocks(this);
}

@AfterEach
void tearDown() throws Exception {
mocks.close();
}

@Test
Expand All @@ -30,23 +40,23 @@ void testScheduleRetry_defaultPolicy() {

RetryHandler retryHandler = new RetryHandler(exponentialBackoffPolicy, scheduledExecutorService);

retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));

verify(scheduledExecutorService).schedule(runnable, 1, TimeUnit.SECONDS);
assertEquals(1, retryHandler.getRetryCount());

// second retry
retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));
assertEquals(2, retryHandler.getRetryCount());
verify(scheduledExecutorService).schedule(runnable, 2, TimeUnit.SECONDS);

// third retry
retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));
assertEquals(3, retryHandler.getRetryCount());
verify(scheduledExecutorService).schedule(runnable, 4, TimeUnit.SECONDS);

// fourth retry
retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));
assertEquals(4, retryHandler.getRetryCount());
verify(scheduledExecutorService).schedule(runnable, 8, TimeUnit.SECONDS);
}
Expand All @@ -58,27 +68,41 @@ void testScheduleRetry_maxRetries() {

RetryHandler retryHandler = new RetryHandler(exponentialBackoffPolicy, scheduledExecutorService);

retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));

verify(scheduledExecutorService).schedule(runnable, 1, TimeUnit.SECONDS);
assertEquals(1, retryHandler.getRetryCount());

// second retry
retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));
assertEquals(2, retryHandler.getRetryCount());
verify(scheduledExecutorService).schedule(runnable, 2, TimeUnit.SECONDS);

// third retry
retryHandler.scheduleRetry(runnable);
assertTrue(retryHandler.scheduleRetry(runnable));
assertEquals(3, retryHandler.getRetryCount());
verify(scheduledExecutorService).schedule(runnable, 4, TimeUnit.SECONDS);

Mockito.reset(scheduledExecutorService);

// fourth retry exceeds max retries
retryHandler.scheduleRetry(runnable);
assertFalse(retryHandler.scheduleRetry(runnable));
verify(scheduledExecutorService).isShutdown();
verifyNoMoreInteractions(scheduledExecutorService);
}

@Test
void testScheduleRetry_executorShutdown() {
Runnable runnable = () -> { };
ExponentialBackoffPolicy exponentialBackoffPolicy = ExponentialBackoffPolicy.DEFAULT;
when(scheduledExecutorService.isShutdown()).thenReturn(true);

RetryHandler retryHandler = new RetryHandler(exponentialBackoffPolicy, scheduledExecutorService);

assertFalse(retryHandler.scheduleRetry(runnable));
verify(scheduledExecutorService).isShutdown();
verifyNoMoreInteractions(scheduledExecutorService);
assertEquals(0, retryHandler.getRetryCount());
}

@Test
Expand Down