Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/src/main/java/io/ably/lib/http/HttpScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,18 @@ private String extendMessage(String msg) {

@Override
public void run() {
String candidateHost = httpCore.hosts.getPreferredHost();
String preferredHost = httpCore.hosts.getPreferredHost();
String candidateHost = preferredHost;
int retryCountRemaining = (httpCore.hosts.fallbackHostsRemaining(candidateHost) > 0) ? httpCore.options.httpMaxRetryCount : 0;

while(!isCancelled) {
try {
boolean shouldPersist = !candidateHost.equals(preferredHost);
result = httpExecuteWithRetry(candidateHost, path, requireAblyAuth);
setResult(result);
httpCore.hosts.setPreferredHost(candidateHost, true);
if (shouldPersist) {
httpCore.hosts.setPreferredHost(candidateHost, true);
}
Comment thread
ttypic marked this conversation as resolved.
break;
} catch (AblyException.HostFailedException e) {
if(--retryCountRemaining < 0) {
Expand Down
135 changes: 135 additions & 0 deletions lib/src/test/java/io/ably/lib/test/rest/HttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,23 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import io.ably.lib.debug.DebugOptions;
import io.ably.lib.network.HttpRequest;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -1382,4 +1393,128 @@ public void describeTo(Description description) {
description.appendText(errorInfo.toString());
}
}

/**
* Verifies RSC15f: a late-arriving success from a fallback host must not
* re-pin the fallback when the fallbackRetryTimeout has already expired.
*
* Sequence:
* req1 primary → 500 (triggers fallback)
* req2 fallback → 200 immediate (pins fallback, timeout = 100 ms)
* req3 fallback → 200 delayed 200 ms (in-flight when timeout expires)
* req4 primary → 200 immediate (timeout expired → primary tried again)
* req5 primary → 200 immediate (late req3 success must NOT have re-pinned)
*/
@Test
public void http_fallback_late_success_does_not_repin_expired_timeout() throws Exception {
final String primaryHost = "main.realtime.ably.net";
final String fallbackHost = "main.a.fallback.ably-realtime.com";

final List<String> capturedHosts = Collections.synchronizedList(new ArrayList<>());
final CountDownLatch delayedRequestStarted = new CountDownLatch(1);
final AtomicInteger requestCount = new AtomicInteger(0);

DebugOptions opts = new DebugOptions("appId.keyId:keySecret");
opts.restHost = primaryHost;
opts.fallbackHosts = new String[]{fallbackHost};
opts.fallbackRetryTimeout = 100L;
opts.httpMaxRetryCount = 1;
opts.httpListener = new DebugOptions.RawHttpListener() {
@Override
public HttpCore.Response onRawHttpRequest(String id, HttpRequest request, String authHeader,
Map<String, List<String>> requestHeaders, HttpCore.RequestBody requestBody) {
int n = requestCount.incrementAndGet();
capturedHosts.add(request.getUrl().getHost());

HttpCore.Response r = new HttpCore.Response();
switch (n) {
case 1:
r.statusCode = 500;
r.statusLine = "Internal Server Error";
break;
case 2:
r.statusCode = 200;
r.contentType = "application/json";
r.body = "[1000]".getBytes();
r.contentLength = r.body.length;
break;
case 3:
delayedRequestStarted.countDown();
try { Thread.sleep(200L); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
r.statusCode = 200;
r.contentType = "application/json";
r.body = "[2000]".getBytes();
r.contentLength = r.body.length;
break;
case 4:
r.statusCode = 200;
r.contentType = "application/json";
r.body = "[3000]".getBytes();
r.contentLength = r.body.length;
break;
case 5:
r.statusCode = 200;
r.contentType = "application/json";
r.body = "[4000]".getBytes();
r.contentLength = r.body.length;
break;
default:
r.statusCode = 500;
r.statusLine = "Unexpected extra request";
}
return r;
}
@Override public void onRawHttpResponse(String id, String method, HttpCore.Response r) {}
@Override public void onRawHttpException(String id, String method, Throwable t) {}
};

AblyRest client = new AblyRest(opts);

// req1 (primary→500) + req2 (fallback→200); fallback is now pinned
client.time();

// req3: fire in background — hits pinned fallback, sleeps 200 ms inside listener
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Future<Long> requestFuture = executor.submit(() -> {
try { return client.time(); }
catch (AblyException e) { throw new RuntimeException(e); }
});

// Wait until req3 has actually entered the listener before starting the clock
assertTrue("Delayed request must start within 5 s", delayedRequestStarted.await(5, TimeUnit.SECONDS));

// Wait 150 ms so that fallbackRetryTimeout (100 ms) expires
Thread.sleep(150L);

// req4: timeout expired → primary tried again
client.time();

// Wait for req3's delayed response to arrive (late fallback success)
requestFuture.get(5, TimeUnit.SECONDS);

// req5: late success from req3 must NOT have re-pinned the fallback
client.time();

assertEquals(5, capturedHosts.size());
assertEquals(primaryHost, capturedHosts.get(0)); // req1 – primary fails
assertEquals(fallbackHost, capturedHosts.get(1)); // req2 – fallback pins
assertEquals(fallbackHost, capturedHosts.get(2)); // req3 – in-flight fallback
assertEquals(primaryHost, capturedHosts.get(3)); // req4 – timeout expired
assertEquals(primaryHost, capturedHosts.get(4)); // req5 – late success did not re-pin
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
executor.awaitTermination(5, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
Loading