diff --git a/README.md b/README.md
index 2e3cb2f6..1261fba2 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@ The easiest way is to download the jar as well as its transitive dependencies (o
com.couchbase.client
java-client
- 2.5.5
+ 2.5.9
```
diff --git a/pom.xml b/pom.xml
index 1b7bd017..10a1e06d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
com.couchbase.client
java-client
- 2.5.6-SNAPSHOT
+ 2.5.10-SNAPSHOT
jar
Couchbase Java SDK
@@ -28,7 +28,7 @@
UTF-8
UTF-8
- 1.5.6-SNAPSHOT
+ 1.5.10-SNAPSHOT
4.12
1.10.19
1.7.7
diff --git a/src/integration/java/com/couchbase/client/java/ConnectionTest.java b/src/integration/java/com/couchbase/client/java/ConnectionTest.java
index 444e0f4c..7eadf1a3 100644
--- a/src/integration/java/com/couchbase/client/java/ConnectionTest.java
+++ b/src/integration/java/com/couchbase/client/java/ConnectionTest.java
@@ -23,6 +23,7 @@
import static org.junit.Assume.assumeTrue;
import com.couchbase.client.java.cluster.ClusterManager;
+import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.error.BucketDoesNotExistException;
import com.couchbase.client.java.error.InvalidPasswordException;
import com.couchbase.client.java.util.TestProperties;
@@ -75,6 +76,26 @@ public void shouldThrowConfigurationExceptionForWrongBucketPassword() {
cluster.openBucket(TestProperties.bucket(), "completelyWrongPassword");
}
+ @Test
+ public void shouldBootstrapWithBadHost() {
+ Cluster cluster = CouchbaseCluster.create("badnode", TestProperties.seedNode());
+ cluster.openBucket(TestProperties.bucket(), TestProperties.password());
+ }
+
+ @Test
+ public void shouldProvideClusterInfoWithBadHostInBootstrapList() {
+ Cluster cluster = CouchbaseCluster.create("x.y.z", TestProperties.seedNode());
+ cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword());
+ cluster.clusterManager().info();
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void shouldNotCheckReverseLookupWhenDNSSRVEnabled() throws Exception {
+ Cluster cluster = CouchbaseCluster.create(DefaultCouchbaseEnvironment.builder().dnsSrvEnabled(true).build(), "x.y.z");
+ cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword());
+ cluster.clusterManager().info();
+ }
+
@Test
public void shouldCacheBucketReference() {
Cluster cluster = CouchbaseCluster.create(TestProperties.seedNode());
diff --git a/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java b/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java
index c0253042..419658d6 100644
--- a/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java
+++ b/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java
@@ -340,4 +340,22 @@ public void shouldWorkWithPrettyFalse() {
assertNotNull(result.allRows());
assertNotNull(result.errors());
}
+
+ @Test
+ public void shouldWorkWithProfileInfoEnabled() {
+ ctx.ignoreIfClusterUnder(Version.parseVersion("4.5.1"));
+ N1qlQuery query = N1qlQuery.simple(
+ select("*").fromCurrentBucket().limit(1),
+ N1qlParams.build().profile(N1qlProfile.TIMINGS).consistency(CONSISTENCY)
+ );
+
+ N1qlQueryResult result = ctx.bucket().query(query);
+ assertEquals(1, result.allRows().size());
+ assertTrue(result.parseSuccess());
+ assertTrue(result.finalSuccess());
+ assertNotNull(result.info());
+ assertNotNull(result.allRows());
+ assertNotNull(result.errors());
+ assertTrue(result.profileInfo().size() > 0);
+ }
}
diff --git a/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java b/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java
index 4dd74757..364a727b 100644
--- a/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java
+++ b/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java
@@ -19,7 +19,12 @@
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.error.subdoc.PathInvalidException;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.java.bucket.BucketType;
@@ -34,6 +39,8 @@
import com.couchbase.client.java.util.CouchbaseTestContext;
import com.couchbase.client.java.util.features.CouchbaseFeature;
import org.junit.*;
+import rx.functions.Action0;
+import rx.functions.Action1;
public class DataStructuresTest {
@@ -357,4 +364,51 @@ public void testSetSizeOnNonExistentDocument() {
ctx.bucket().setSize("dssetRandom");
}
+
+ @Test
+ public void testMultiThreadQueuePop() throws Exception {
+ ctx.bucket().queuePush("testMultiThreadQueuePop", 1, MutationOptionBuilder.builder().createDocument(true));
+
+ final CountDownLatch latch = new CountDownLatch(10);
+ ExecutorService pool = Executors.newFixedThreadPool(10);
+ final AtomicInteger atomicInteger = new AtomicInteger();
+
+ for (int i=0; i < 10; i++) {
+ pool.submit(new Runnable() {
+ @Override
+ public void run() {
+ ctx.bucket().async().queuePop("testMultiThreadQueuePop", Integer.class)
+ .subscribe(
+ new Action1() {
+ @Override
+ public void call(Integer val) {
+ if (val == 1) {
+ atomicInteger.incrementAndGet();
+ } else {
+ Assert.assertEquals(val, null);
+ }
+ }
+ },
+ new Action1() {
+ @Override
+ public void call(Throwable throwable) {
+ //ignore
+ latch.countDown();
+ }
+ },
+ new Action0() {
+ @Override
+ public void call() {
+ latch.countDown();
+ }
+ }
+
+ );
+ }
+ });
+ }
+ latch.await();
+ Assert.assertEquals(1, atomicInteger.get());
+ }
+
}
\ No newline at end of file
diff --git a/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java b/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java
index 56348564..459ca538 100644
--- a/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java
+++ b/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java
@@ -17,6 +17,7 @@
import com.couchbase.client.java.document.EntityDocument;
import com.couchbase.client.java.document.JsonDocument;
+import com.couchbase.client.java.repository.annotation.Field;
import com.couchbase.client.java.repository.annotation.Id;
import com.couchbase.client.java.repository.mapping.RepositoryMappingException;
import com.couchbase.client.java.util.ClusterDependentTest;
@@ -136,6 +137,27 @@ public void shouldFailWithNonStringIdProperty() {
repository().upsert(EntityDocument.create(new EntityWithNoNStringId()));
}
+ @Test
+ public void shouldUpsertExtendedEntity() {
+ Child entity = new Child("myid", "myname");
+ EntityDocument document = EntityDocument.create(entity);
+
+ assertFalse(repository().exists(document));
+ assertEquals(0, document.cas());
+ EntityDocument stored = repository().upsert(document);
+ assertNotEquals(0, stored.cas());
+ assertEquals(document.content(), stored.content());
+
+ JsonDocument storedRaw = bucket().get(entity.getId());
+ assertEquals(entity.getName(), storedRaw.content().getString("name"));
+
+ EntityDocument found = repository().get(entity.getId(), Child.class);
+ assertEquals(found.cas(), stored.cas());
+ assertNotEquals(0, found.cas());
+
+ assertTrue(repository().exists(document));
+ }
+
static class EntityWithoutId {
}
@@ -149,4 +171,36 @@ static class EntityWithNoNStringId {
Date id = new Date();
}
+
+ public static abstract class Parent {
+ @Id
+ private String id;
+
+ Parent(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+ }
+
+ public static class Child extends Parent {
+ @Field
+ private String name;
+
+ public Child() {
+ super(null);
+ }
+
+ public Child(String id, String name) {
+ super(id);
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+
}
diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java
index 530bc0b8..fa26c260 100644
--- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java
+++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java
@@ -813,11 +813,12 @@ public Observable extends D> call(Throwable throwable) {
@Override
public Observable query(final ViewQuery query) {
- Observable source = Observable.defer(new Func0>() {
+ Observable source = deferAndWatch(new Func1>() {
@Override
- public Observable call() {
+ public Observable call(Subscriber subscriber) {
final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(),
query.isDevelopment(), query.toQueryString(), query.getKeys(), bucket, username, password);
+ request.subscriber(subscriber);
return core.send(request);
}
});
@@ -841,11 +842,12 @@ public Observable query(final SearchQuery query) {
query.serverSideTimeout(environment().searchTimeout(), TimeUnit.MILLISECONDS);
}
- Observable source = Observable.defer(new Func0>() {
+ Observable source = deferAndWatch(new Func1>() {
@Override
- public Observable call() {
+ public Observable call(Subscriber subscriber) {
final SearchQueryRequest request =
- new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password);
+ new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password);
+ request.subscriber(subscriber);
return core.send(request);
}
});
@@ -872,11 +874,12 @@ public AsyncSearchQueryResult call(SearchQueryResponse response) {
@Override
public Observable query(final SpatialViewQuery query) {
- Observable source = Observable.defer(new Func0>() {
+ Observable source = deferAndWatch(new Func1>() {
@Override
- public Observable call() {
+ public Observable call(Subscriber subscriber) {
final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(),
- query.isDevelopment(), true, query.toString(), null, bucket, username, password);
+ query.isDevelopment(), true, query.toString(), null, bucket, username, password);
+ request.subscriber(subscriber);
return core.send(request);
}
});
@@ -2026,6 +2029,9 @@ public Observable extends DocumentFragment> call(Throwable throwable
.map(new Func1>() {
@Override
public DocumentFragment call(E element) {
+ if (element == null) {
+ throw new CASMismatchException();
+ }
return ResultMappingUtils.convertToSubDocumentResult(ResponseStatus.SUCCESS, mutationOperation, element);
}
});
diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java
index a321fd12..dcc588a8 100644
--- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java
+++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java
@@ -325,11 +325,11 @@ private static List assembleSeedNodes(ConnectionString connectionString,
*/
private static void seedNodesViaDnsSrv(ConnectionString connectionString,
CouchbaseEnvironment environment, List seedNodes) {
- if (connectionString.hosts().size() == 1) {
- InetSocketAddress lookupNode = connectionString.hosts().get(0);
+ if (connectionString.allHosts().size() == 1) {
+ InetSocketAddress lookupNode = connectionString.allHosts().get(0);
LOGGER.debug(
"Attempting to load DNS SRV records from {}.",
- connectionString.hosts().get(0)
+ connectionString.allHosts().get(0)
);
try {
diff --git a/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java b/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java
index 7e6dd5e7..e4f4c0ab 100644
--- a/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java
+++ b/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java
@@ -23,7 +23,7 @@
import com.couchbase.client.java.error.TranscodingException;
import com.couchbase.client.java.transcoder.TranscoderUtils;
import rx.Observable;
-import rx.functions.Func0;
+import rx.Subscriber;
import rx.functions.Func1;
import rx.functions.Func6;
@@ -31,6 +31,7 @@
import java.util.List;
import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER;
+import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
public class AnalyticsQueryExecutor {
@@ -47,10 +48,13 @@ public AnalyticsQueryExecutor(ClusterFacade core, String bucket, String username
}
public Observable execute(final AnalyticsQuery query) {
- return Observable.defer(new Func0>() {
+ return deferAndWatch(new Func1>() {
@Override
- public Observable call() {
- return core.send(GenericAnalyticsRequest.jsonQuery(query.query().toString(), bucket, username, password));
+ public Observable call(final Subscriber subscriber) {
+ GenericAnalyticsRequest request = GenericAnalyticsRequest
+ .jsonQuery(query.query().toString(), bucket, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
}
}).flatMap(new Func1>() {
@Override
diff --git a/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java b/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java
index fab37dee..13bc3918 100644
--- a/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java
+++ b/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java
@@ -31,6 +31,7 @@
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.error.FlushDisabledException;
import rx.Observable;
+import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
@@ -39,6 +40,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
+import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
import static com.couchbase.client.java.util.retry.RetryBuilder.any;
/**
@@ -118,8 +120,15 @@ private static Observable> createMarkerDocuments(final ClusterFacad
.from(FLUSH_MARKERS)
.flatMap(new Func1>() {
@Override
- public Observable call(String id) {
- return core.send(new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket));
+ public Observable call(final String id) {
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends UpsertResponse> call(final Subscriber subscriber) {
+ UpsertRequest request = new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.doOnNext(new Action1() {
@@ -152,8 +161,14 @@ public List call(UpsertResponse response) {
* @return an observable indicating if done (true) or polling needs to happen (false).
*/
private static Observable initiateFlush(final ClusterFacade core, final String bucket, final String username, final String password) {
- return core
- .send(new FlushRequest(bucket, username, password))
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable call(Subscriber subscriber) {
+ FlushRequest request = new FlushRequest(bucket, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ })
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
.map(new Func1() {
@Override
@@ -182,8 +197,15 @@ private static Observable pollMarkerDocuments(final ClusterFacade core,
.from(FLUSH_MARKERS)
.flatMap(new Func1>() {
@Override
- public Observable call(String id) {
- return core.send(new GetRequest(id, bucket));
+ public Observable call(final String id) {
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends GetResponse> call(Subscriber subscriber) {
+ GetRequest request = new GetRequest(id, bucket);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.reduce(0, new Func2() {
diff --git a/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java b/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java
index 76bc8c21..6f305719 100644
--- a/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java
+++ b/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java
@@ -117,7 +117,15 @@ public Observable info() {
return Observable.defer(new Func0>() {
@Override
public Observable call() {
- return core.send(new BucketConfigRequest("/pools/default/buckets/", null, bucket, username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends BucketConfigResponse> call(Subscriber subscriber) {
+ BucketConfigRequest request = new BucketConfigRequest("/pools/default/buckets/",
+ null, bucket, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -151,7 +159,14 @@ public Observable getDesignDocuments(final boolean development)
return Observable.defer(new Func0>() {
@Override
public Observable call() {
- return core.send(new GetDesignDocumentsRequest(bucket, username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends GetDesignDocumentsResponse> call(final Subscriber subscriber) {
+ GetDesignDocumentsRequest request = new GetDesignDocumentsRequest(bucket, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
diff --git a/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java b/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java
index 94ba54a0..6d379828 100644
--- a/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java
+++ b/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java
@@ -33,11 +33,13 @@
import com.couchbase.client.java.error.CouchbaseOutOfMemoryException;
import com.couchbase.client.java.error.TemporaryFailureException;
import rx.Observable;
+import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.List;
+import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
/**
* Helper class to deal with reading from zero to N replicas and returning results.
@@ -70,15 +72,19 @@ private ReplicaReader() {}
public static Observable read(final ClusterFacade core, final String id,
final ReplicaMode type, final String bucket) {
return assembleRequests(core, id, type, bucket)
- .flatMap(new Func1>() {
- @Override
- public Observable call(BinaryRequest request) {
- return core
- .send(request)
- .filter(GetResponseFilter.INSTANCE)
- .onErrorResumeNext(GetResponseErrorHandler.INSTANCE);
- }
- });
+ .flatMap(new Func1>() {
+ @Override
+ public Observable call(final BinaryRequest request) {
+ Observable result = deferAndWatch(new Func1>() {
+ @Override
+ public Observable call(Subscriber subscriber) {
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ }).filter(GetResponseFilter.INSTANCE);
+ return result.onErrorResumeNext(GetResponseErrorHandler.INSTANCE);
+ }
+ });
}
/**
diff --git a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java
index 31d5ad95..db5e5417 100644
--- a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java
+++ b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java
@@ -18,6 +18,8 @@
import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.annotations.InterfaceStability;
+import com.couchbase.client.core.logging.CouchbaseLogger;
+import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.config.BucketsConfigRequest;
import com.couchbase.client.core.message.config.BucketsConfigResponse;
import com.couchbase.client.core.message.config.ClusterConfigRequest;
@@ -53,15 +55,19 @@
import com.couchbase.client.java.error.InvalidPasswordException;
import com.couchbase.client.java.error.TranscodingException;
import rx.Observable;
+import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Func1;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
import static com.couchbase.client.java.util.retry.RetryBuilder.any;
public class DefaultAsyncClusterManager implements AsyncClusterManager {
@@ -104,7 +110,14 @@ public Observable info() {
.flatMap(new Func1>() {
@Override
public Observable call(Boolean aBoolean) {
- return core.send(new ClusterConfigRequest(username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends ClusterConfigResponse> call(Subscriber subscriber) {
+ ClusterConfigRequest request = new ClusterConfigRequest(username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -138,7 +151,14 @@ public Observable getBuckets() {
.flatMap(new Func1>() {
@Override
public Observable call(Boolean aBoolean) {
- return core.send(new BucketsConfigRequest(username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends BucketsConfigResponse> call(Subscriber subscriber) {
+ BucketsConfigRequest request = new BucketsConfigRequest(username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -231,7 +251,14 @@ public Observable removeBucket(final String name) {
.flatMap(new Func1>() {
@Override
public Observable call(Boolean aBoolean) {
- return core.send(new RemoveBucketRequest(name, username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends RemoveBucketResponse> call(Subscriber subscriber) {
+ RemoveBucketRequest request = new RemoveBucketRequest(name, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -258,7 +285,14 @@ public void call(Boolean exists) {
}).flatMap(new Func1>() {
@Override
public Observable call(Boolean exists) {
- return core.send(new InsertBucketRequest(payload, username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends InsertBucketResponse> call(Subscriber subscriber) {
+ InsertBucketRequest request = new InsertBucketRequest(payload, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -288,7 +322,15 @@ public void call(Boolean exists) {
}).flatMap(new Func1>() {
@Override
public Observable call(Boolean exists) {
- return core.send(new UpdateBucketRequest(settings.name(), payload, username, password));
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends UpdateBucketResponse> call(Subscriber subscriber) {
+ UpdateBucketRequest request = new UpdateBucketRequest(
+ settings.name(), payload, username, password);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -307,48 +349,64 @@ public BucketSettings call(UpdateBucketResponse response) {
public Observable upsertUser(final AuthDomain domain, final String userid, final UserSettings userSettings) {
final String payload = getUserSettingsPayload(userSettings);
return ensureServiceEnabled()
- .flatMap(new Func1>() {
- @Override
- public Observable call(Boolean aBoolean) {
- return core.send(new UpsertUserRequest(username, password, domain.alias(), userid, payload));
- }
- })
- .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
- .map(new Func1() {
- @Override
- public Boolean call(UpsertUserResponse response) {
- if (!response.status().isSuccess()) {
- StringBuilder sb = new StringBuilder();
- sb.append("Could not update user: ");
- sb.append(response.status());
- if (response.message().length() > 0) {
- sb.append(", ");
- sb.append("msg: ");
- sb.append(response.message());
- }
- throw new CouchbaseException(sb.toString());
- }
- return true;
- }
- });
+ .flatMap(new Func1>() {
+ @Override
+ public Observable call(Boolean aBoolean) {
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends UpsertUserResponse> call(Subscriber subscriber) {
+ UpsertUserRequest request = new UpsertUserRequest(
+ username, password, domain.alias(), userid, payload);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
+ }
+ })
+ .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
+ .map(new Func1() {
+ @Override
+ public Boolean call(UpsertUserResponse response) {
+ if (!response.status().isSuccess()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Could not update user: ");
+ sb.append(response.status());
+ if (response.message().length() > 0) {
+ sb.append(", ");
+ sb.append("msg: ");
+ sb.append(response.message());
+ }
+ throw new CouchbaseException(sb.toString());
+ }
+ return true;
+ }
+ });
}
@Override
public Observable removeUser(final AuthDomain domain, final String userid) {
return ensureServiceEnabled()
- .flatMap(new Func1>() {
- @Override
- public Observable call(Boolean aBoolean) {
- return core.send(new RemoveUserRequest(username, password, domain.alias(), userid));
- }
- })
- .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
- .map(new Func1() {
- @Override
- public Boolean call(RemoveUserResponse response) {
- return response.status().isSuccess();
- }
- });
+ .flatMap(new Func1>() {
+ @Override
+ public Observable call(Boolean aBoolean) {
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends RemoveUserResponse> call(Subscriber subscriber) {
+ RemoveUserRequest request = new RemoveUserRequest(
+ username, password, domain.alias(), userid);
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
+ }
+ })
+ .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
+ .map(new Func1() {
+ @Override
+ public Boolean call(RemoveUserResponse response) {
+ return response.status().isSuccess();
+ }
+ });
}
@Override
@@ -362,11 +420,16 @@ public Observable getUser(final AuthDomain domain, final String userid) {
.flatMap(new Func1>() {
@Override
public Observable call(Boolean aBoolean) {
- if (userid == null || userid.isEmpty()) {
- return core.send(GetUsersRequest.usersFromDomain(username, password, domain.alias()));
- } else {
- return core.send(GetUsersRequest.user(username, password, domain.alias(), userid));
- }
+ final GetUsersRequest request = (userid == null || userid.isEmpty())
+ ? GetUsersRequest.usersFromDomain(username, password, domain.alias())
+ : GetUsersRequest.user(username, password, domain.alias(), userid);
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends GetUsersResponse> call(Subscriber subscriber) {
+ request.subscriber(subscriber);
+ return core.send(request);
+ }
+ });
}
})
.retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build())
@@ -543,44 +606,48 @@ public Observable call(ClusterInfo clusterInfo) {
});
}
+ Observable sendAddNodeRequest(final InetSocketAddress address) {
+ final NetworkAddress networkAddress = NetworkAddress.create(address.getAddress().getHostAddress());
+ return core.send(new AddNodeRequest(networkAddress))
+ .flatMap(new Func1>() {
+ @Override
+ public Observable call(AddNodeResponse addNodeResponse) {
+ if (!addNodeResponse.status().isSuccess()) {
+ throw new CouchbaseException("Could not enable ClusterManager service to function properly.");
+ }
+ int port = environment.sslEnabled() ? environment.bootstrapHttpSslPort() : environment.bootstrapHttpDirectPort();
+ return core.send(new AddServiceRequest(ServiceType.CONFIG, username, password, port, networkAddress));
+ }
+ })
+ .map(new Func1() {
+ @Override
+ public Boolean call(AddServiceResponse addServiceResponse) {
+ if (!addServiceResponse.status().isSuccess()) {
+ throw new CouchbaseException("Could not enable ClusterManager service to function properly.");
+ }
+ return true;
+ }
+ });
+ }
+
private Observable ensureServiceEnabled() {
if (connectionString.hosts().isEmpty()) {
return Observable.error(new IllegalStateException("No host found in the connection string! " + connectionString.toString()));
}
- return Observable
- .just(connectionString.hosts().get(0).getAddress().getHostAddress())
- .map(new Func1() {
- @Override
- public NetworkAddress call(String hostname) {
- return NetworkAddress.create(hostname);
- }
- })
- .flatMap(new Func1>() {
- @Override
- public Observable call(final NetworkAddress hostname) {
- return core
- .send(new AddNodeRequest(hostname))
- .flatMap(new Func1>() {
- @Override
- public Observable call(AddNodeResponse response) {
- int port = environment.sslEnabled()
- ? environment.bootstrapHttpSslPort() : environment.bootstrapHttpDirectPort();
- return core.send(new AddServiceRequest(ServiceType.CONFIG, username, password,
- port, hostname));
- }
- });
- }
- })
- .map(new Func1() {
- @Override
- public Boolean call(AddServiceResponse addServiceResponse) {
- if (!addServiceResponse.status().isSuccess()) {
- throw new CouchbaseException("Could not enable ClusterManager service to function properly.");
+ final AtomicInteger integer = new AtomicInteger(0);
+ return Observable.just(connectionString.hosts())
+ .flatMap(new Func1, Observable>() {
+ @Override
+ public Observable call(List inetSocketAddresses) {
+ int hostIndex = integer.getAndIncrement();
+ if (hostIndex >= connectionString.hosts().size()) {
+ integer.set(0);
+ return Observable.error(new CouchbaseException("Could not enable ClusterManager service to function properly."));
+ }
+ return sendAddNodeRequest(inetSocketAddresses.get(hostIndex));
}
- return true;
- }
- });
- }
+ });
+ }
}
\ No newline at end of file
diff --git a/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java b/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java
index 9e9ab985..4c734ab9 100644
--- a/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java
+++ b/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java
@@ -30,7 +30,11 @@
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.document.json.JsonValue;
import rx.Observable;
+import rx.Subscriber;
import rx.functions.Func0;
+import rx.functions.Func1;
+
+import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
/**
* A builder class to incrementally construct REST API requests and execute
@@ -234,12 +238,13 @@ public RestApiRequest asRequest() {
* @return an {@link Observable} of the result of the API call, which is a {@link RestApiResponse}.
*/
public Observable execute() {
- return Observable.defer(new Func0>() {
+ return deferAndWatch(new Func1>() {
@Override
- public Observable call() {
+ public Observable extends RestApiResponse> call(Subscriber subscriber) {
RestApiRequest apiRequest = asRequest();
LOGGER.debug("Executing Cluster API request {} on {}", apiRequest.method(), apiRequest.pathWithParameters());
- return core.send(asRequest());
+ apiRequest.subscriber(subscriber);
+ return core.send(apiRequest);
}
});
}
diff --git a/src/main/java/com/couchbase/client/java/query/N1qlParams.java b/src/main/java/com/couchbase/client/java/query/N1qlParams.java
index 47c5aaee..885a8a4a 100644
--- a/src/main/java/com/couchbase/client/java/query/N1qlParams.java
+++ b/src/main/java/com/couchbase/client/java/query/N1qlParams.java
@@ -64,6 +64,7 @@ public class N1qlParams implements Serializable {
private Map rawParams;
private boolean pretty;
private boolean readonly;
+ private N1qlProfile profile;
private final Map credentials;
@@ -142,6 +143,10 @@ public void injectParams(JsonObject queryJson) {
queryJson.put("readonly", true);
}
+ if (this.profile != null) {
+ queryJson.put("profile", this.profile.toString());
+ }
+
if (this.rawParams != null) {
for (Map.Entry entry : rawParams.entrySet()) {
queryJson.put(entry.getKey(), entry.getValue());
@@ -410,6 +415,18 @@ public N1qlParams pipelineCap(int pipelineCap) {
return this;
}
+ /**
+ * Specifies if there should be a profile section returned with the request results.
+ *
+ * @param profile the profile param {@link N1qlProfile}.
+ * @return this {@link N1qlParams} for chaining.
+ */
+ @InterfaceStability.Uncommitted
+ public N1qlParams profile(N1qlProfile profile) {
+ this.profile = profile;
+ return this;
+ }
+
/**
* Allows to specify an arbitrary, raw N1QL param.
*
@@ -480,6 +497,7 @@ public boolean equals(Object o) {
if (mutationState != null ? !mutationState.equals(that.mutationState) : that.mutationState != null)
return false;
if (!credentials.equals(that.credentials)) return false;
+ if (profile != that.profile) return false;
return rawParams != null ? rawParams.equals(that.rawParams) : that.rawParams == null;
}
@@ -500,6 +518,7 @@ public int hashCode() {
result = 31 * result + (adhoc ? 1 : 0);
result = 31 * result + (pretty ? 1 : 0);
result = 31 * result + (readonly ? 1 : 0);
+ result = 31 * result + (profile != null ? profile.hashCode() : 0);
return result;
}
@@ -521,6 +540,9 @@ public String toString() {
sb.append(", rawParams=").append(rawParams);
if (!credentials.isEmpty())
sb.append(", credentials=").append(credentials.size());
+ if (profile != null) {
+ sb.append(", profile=").append(profile.toString());
+ }
sb.append('}');
return sb.toString();
}
diff --git a/src/main/java/com/couchbase/client/java/query/N1qlProfile.java b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java
new file mode 100644
index 00000000..b66357f9
--- /dev/null
+++ b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (c) 2018 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.couchbase.client.java.query;
+
+import com.couchbase.client.core.annotations.InterfaceAudience;
+import com.couchbase.client.core.annotations.InterfaceStability;
+
+/**
+ * N1ql profile options
+ */
+@InterfaceStability.Uncommitted
+@InterfaceAudience.Public
+public enum N1qlProfile {
+
+ /**
+ * No profiling information is added to the query response.
+ */
+ OFF {
+ @Override
+ public String toString() {
+ return "off";
+ }
+ },
+
+ /**
+ * The query response includes a profile section with stats and details
+ * about various phases of the query plan and execution.
+ * Three phase times will be included in the system:active_requests and
+ * system:completed_requests monitoring keyspaces.
+ */
+ PHASES {
+ @Override
+ public String toString() {
+ return "phases";
+ }
+ },
+
+ /**
+ * Besides the phase times, the profile section of the query response document will
+ * include a full query plan with timing and information about the number of processed
+ * documents at each phase. This information will be included in the system:active_requests
+ * and system:completed_requests keyspaces.
+ */
+ TIMINGS {
+ @Override
+ public String toString() {
+ return "timings";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java b/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java
index e3c4de6b..bdcc9ef0 100644
--- a/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java
+++ b/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java
@@ -53,6 +53,7 @@
import com.couchbase.client.java.transcoder.TranscoderUtils;
import com.couchbase.client.java.util.LRUCache;
import rx.Observable;
+import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.functions.Func0;
import rx.functions.Func1;
@@ -68,6 +69,7 @@
import java.util.Map;
import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER;
+import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch;
/**
* A class used to execute various N1QL queries.
@@ -186,10 +188,12 @@ public Observable execute(final N1qlQuery query) {
* @return a result containing all found rows and additional information.
*/
protected Observable executeQuery(final N1qlQuery query) {
- return Observable.defer(new Func0>() {
+ return deferAndWatch(new Func1>() {
@Override
- public Observable call() {
- return core.send(createN1qlRequest(query, bucket, username, password, null));
+ public Observable call(Subscriber subscriber) {
+ GenericQueryRequest request = createN1qlRequest(query, bucket, username, password, null);
+ request.subscriber(subscriber);
+ return core.send(request);
}
}).flatMap(new Func1>() {
@Override
@@ -439,10 +443,12 @@ protected Observable prepare(Statement statement) {
if (isEncodedPlanEnabled()) {
//we'll include the encodedPlan in each EXECUTE, so we don't broadcast during PREPARE
- source = Observable.defer(new Func0>() {
+ source = deferAndWatch(new Func1>() {
@Override
- public Observable call() {
- return core.send(createN1qlRequest(query, bucket, username, password, null));
+ public Observable extends GenericQueryResponse> call(Subscriber subscriber) {
+ GenericQueryRequest request = createN1qlRequest(query, bucket, username, password, null);
+ request.subscriber(subscriber);
+ return core.send(request);
}
});
} else {
@@ -470,8 +476,14 @@ public Boolean call(NodeInfo nodeInfo) {
public Observable call(NodeInfo nodeInfo) {
try {
InetAddress hostname = InetAddress.getByName(nodeInfo.hostname().address());
- GenericQueryRequest req = createN1qlRequest(query, bucket, username, password, hostname);
- return core.send(req);
+ final GenericQueryRequest req = createN1qlRequest(query, bucket, username, password, hostname);
+ return deferAndWatch(new Func1>() {
+ @Override
+ public Observable extends GenericQueryResponse> call(Subscriber subscriber) {
+ req.subscriber(subscriber);
+ return core.send(req);
+ }
+ });
} catch (UnknownHostException e) {
return Observable.error(e);
}
diff --git a/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java b/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java
index d943997f..e381fb29 100644
--- a/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java
+++ b/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java
@@ -17,6 +17,7 @@
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
/**
@@ -30,11 +31,11 @@ public class ReflectionBasedEntityMetadata implements EntityMetadata {
private final List properties;
private final PropertyMetadata idProperty;
- public ReflectionBasedEntityMetadata(Class> sourceEntity) {
+ public ReflectionBasedEntityMetadata(final Class> sourceEntity) {
properties = new ArrayList();
PropertyMetadata idProperty = null;
- for (Field field : sourceEntity.getDeclaredFields()) {
+ for (Field field : getAllDeclaredFields(sourceEntity)) {
PropertyMetadata property = new ReflectionBasedPropertyMetadata(field);
properties.add(property);
if (property.isId()) {
@@ -45,6 +46,24 @@ public ReflectionBasedEntityMetadata(Class> sourceEntity) {
this.idProperty = idProperty;
}
+ /**
+ * Helper method to grab all the declared fields from the given class but also
+ * from its inherited parents!
+ *
+ * @param sourceEntity the source entity to start from.
+ * @return an iterable of found fields.
+ */
+ private static List getAllDeclaredFields(final Class> sourceEntity) {
+ List fields = new ArrayList();
+ Class> clazz = sourceEntity;
+ while (clazz != null) {
+ Field[] f = clazz.getDeclaredFields();
+ fields.addAll(Arrays.asList(f));
+ clazz = clazz.getSuperclass();
+ }
+ return fields;
+ }
+
@Override
public List properties() {
return properties;
diff --git a/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java b/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java
index 736c4cfe..e08a45c0 100644
--- a/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java
+++ b/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java
@@ -1038,9 +1038,9 @@ protected Observable> doMultiMutate() {
}
}
- Observable> mutations = Observable.defer(new Func0>() {
+ Observable> mutations = deferAndWatch(new Func1>() {
@Override
- public Observable call() {
+ public Observable call(Subscriber s) {
List bufList = new ArrayList(mutationSpecs.size());
final List commands = new ArrayList(mutationSpecs.size());
@@ -1063,15 +1063,11 @@ public Observable call() {
}
}
}
- return Observable.from(commands);
- }
- }).toList()
- .flatMap(new Func1, Observable>(){
- @Override
- public Observable call(List mutationCommands) {
- return core.send(new SubMultiMutationRequest(docId, bucketName,
+ SubMultiMutationRequest request = new SubMultiMutationRequest(docId, bucketName,
expiry, cas, SubMultiMutationDocOptionsBuilder.builder().upsertDocument(upsertDocument).insertDocument(insertDocument),
- mutationCommands));
+ commands);
+ request.subscriber(s);
+ return core.send(request);
}
}).flatMap(new Func1>>() {
@Override
diff --git a/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java b/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java
index 34e9c0f1..e4bf9b4f 100644
--- a/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java
+++ b/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java
@@ -145,6 +145,7 @@ public Observable n1qlToRawCustom(final N1qlQuery query, final Func1 call(Subscriber s) {
RawQueryRequest request = RawQueryRequest.jsonQuery(query.n1ql().toString(), bucket, username, password);
+ request.subscriber(s);
return core.send(request);
}
}).map(new Func1() {
@@ -217,10 +218,11 @@ public Observable ftsToRawCustom(final SearchQuery query, final Func1>() {
+ return deferAndWatch(new Func1>() {
@Override
- public Observable call() {
+ public Observable extends SearchQueryResponse> call(Subscriber subscriber) {
SearchQueryRequest request = new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password);
+ request.subscriber(subscriber);
return core.send(request);
}
}).map(new Func1() {