diff --git a/README.md b/README.md index 2e3cb2f6..1261fba2 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ The easiest way is to download the jar as well as its transitive dependencies (o com.couchbase.client java-client - 2.5.5 + 2.5.9 ``` diff --git a/pom.xml b/pom.xml index 1b7bd017..10a1e06d 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.6-SNAPSHOT + 2.5.10-SNAPSHOT jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.6-SNAPSHOT + 1.5.10-SNAPSHOT 4.12 1.10.19 1.7.7 diff --git a/src/integration/java/com/couchbase/client/java/ConnectionTest.java b/src/integration/java/com/couchbase/client/java/ConnectionTest.java index 444e0f4c..7eadf1a3 100644 --- a/src/integration/java/com/couchbase/client/java/ConnectionTest.java +++ b/src/integration/java/com/couchbase/client/java/ConnectionTest.java @@ -23,6 +23,7 @@ import static org.junit.Assume.assumeTrue; import com.couchbase.client.java.cluster.ClusterManager; +import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.error.BucketDoesNotExistException; import com.couchbase.client.java.error.InvalidPasswordException; import com.couchbase.client.java.util.TestProperties; @@ -75,6 +76,26 @@ public void shouldThrowConfigurationExceptionForWrongBucketPassword() { cluster.openBucket(TestProperties.bucket(), "completelyWrongPassword"); } + @Test + public void shouldBootstrapWithBadHost() { + Cluster cluster = CouchbaseCluster.create("badnode", TestProperties.seedNode()); + cluster.openBucket(TestProperties.bucket(), TestProperties.password()); + } + + @Test + public void shouldProvideClusterInfoWithBadHostInBootstrapList() { + Cluster cluster = CouchbaseCluster.create("x.y.z", TestProperties.seedNode()); + cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword()); + cluster.clusterManager().info(); + } + + @Test(expected = NullPointerException.class) + public void shouldNotCheckReverseLookupWhenDNSSRVEnabled() throws Exception { + Cluster cluster = CouchbaseCluster.create(DefaultCouchbaseEnvironment.builder().dnsSrvEnabled(true).build(), "x.y.z"); + cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword()); + cluster.clusterManager().info(); + } + @Test public void shouldCacheBucketReference() { Cluster cluster = CouchbaseCluster.create(TestProperties.seedNode()); diff --git a/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java b/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java index c0253042..419658d6 100644 --- a/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java +++ b/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java @@ -340,4 +340,22 @@ public void shouldWorkWithPrettyFalse() { assertNotNull(result.allRows()); assertNotNull(result.errors()); } + + @Test + public void shouldWorkWithProfileInfoEnabled() { + ctx.ignoreIfClusterUnder(Version.parseVersion("4.5.1")); + N1qlQuery query = N1qlQuery.simple( + select("*").fromCurrentBucket().limit(1), + N1qlParams.build().profile(N1qlProfile.TIMINGS).consistency(CONSISTENCY) + ); + + N1qlQueryResult result = ctx.bucket().query(query); + assertEquals(1, result.allRows().size()); + assertTrue(result.parseSuccess()); + assertTrue(result.finalSuccess()); + assertNotNull(result.info()); + assertNotNull(result.allRows()); + assertNotNull(result.errors()); + assertTrue(result.profileInfo().size() > 0); + } } diff --git a/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java b/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java index 4dd74757..364a727b 100644 --- a/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java +++ b/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java @@ -19,7 +19,12 @@ import com.couchbase.client.java.PersistTo; import com.couchbase.client.java.error.subdoc.PathInvalidException; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.java.bucket.BucketType; @@ -34,6 +39,8 @@ import com.couchbase.client.java.util.CouchbaseTestContext; import com.couchbase.client.java.util.features.CouchbaseFeature; import org.junit.*; +import rx.functions.Action0; +import rx.functions.Action1; public class DataStructuresTest { @@ -357,4 +364,51 @@ public void testSetSizeOnNonExistentDocument() { ctx.bucket().setSize("dssetRandom"); } + + @Test + public void testMultiThreadQueuePop() throws Exception { + ctx.bucket().queuePush("testMultiThreadQueuePop", 1, MutationOptionBuilder.builder().createDocument(true)); + + final CountDownLatch latch = new CountDownLatch(10); + ExecutorService pool = Executors.newFixedThreadPool(10); + final AtomicInteger atomicInteger = new AtomicInteger(); + + for (int i=0; i < 10; i++) { + pool.submit(new Runnable() { + @Override + public void run() { + ctx.bucket().async().queuePop("testMultiThreadQueuePop", Integer.class) + .subscribe( + new Action1() { + @Override + public void call(Integer val) { + if (val == 1) { + atomicInteger.incrementAndGet(); + } else { + Assert.assertEquals(val, null); + } + } + }, + new Action1() { + @Override + public void call(Throwable throwable) { + //ignore + latch.countDown(); + } + }, + new Action0() { + @Override + public void call() { + latch.countDown(); + } + } + + ); + } + }); + } + latch.await(); + Assert.assertEquals(1, atomicInteger.get()); + } + } \ No newline at end of file diff --git a/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java b/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java index 56348564..459ca538 100644 --- a/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java +++ b/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java @@ -17,6 +17,7 @@ import com.couchbase.client.java.document.EntityDocument; import com.couchbase.client.java.document.JsonDocument; +import com.couchbase.client.java.repository.annotation.Field; import com.couchbase.client.java.repository.annotation.Id; import com.couchbase.client.java.repository.mapping.RepositoryMappingException; import com.couchbase.client.java.util.ClusterDependentTest; @@ -136,6 +137,27 @@ public void shouldFailWithNonStringIdProperty() { repository().upsert(EntityDocument.create(new EntityWithNoNStringId())); } + @Test + public void shouldUpsertExtendedEntity() { + Child entity = new Child("myid", "myname"); + EntityDocument document = EntityDocument.create(entity); + + assertFalse(repository().exists(document)); + assertEquals(0, document.cas()); + EntityDocument stored = repository().upsert(document); + assertNotEquals(0, stored.cas()); + assertEquals(document.content(), stored.content()); + + JsonDocument storedRaw = bucket().get(entity.getId()); + assertEquals(entity.getName(), storedRaw.content().getString("name")); + + EntityDocument found = repository().get(entity.getId(), Child.class); + assertEquals(found.cas(), stored.cas()); + assertNotEquals(0, found.cas()); + + assertTrue(repository().exists(document)); + } + static class EntityWithoutId { } @@ -149,4 +171,36 @@ static class EntityWithNoNStringId { Date id = new Date(); } + + public static abstract class Parent { + @Id + private String id; + + Parent(String id) { + this.id = id; + } + + public String getId() { + return id; + } + } + + public static class Child extends Parent { + @Field + private String name; + + public Child() { + super(null); + } + + public Child(String id, String name) { + super(id); + this.name = name; + } + + public String getName() { + return name; + } + } + } diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java index 530bc0b8..fa26c260 100644 --- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java +++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java @@ -813,11 +813,12 @@ public Observable call(Throwable throwable) { @Override public Observable query(final ViewQuery query) { - Observable source = Observable.defer(new Func0>() { + Observable source = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(), query.isDevelopment(), query.toQueryString(), query.getKeys(), bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }); @@ -841,11 +842,12 @@ public Observable query(final SearchQuery query) { query.serverSideTimeout(environment().searchTimeout(), TimeUnit.MILLISECONDS); } - Observable source = Observable.defer(new Func0>() { + Observable source = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { final SearchQueryRequest request = - new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password); + new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }); @@ -872,11 +874,12 @@ public AsyncSearchQueryResult call(SearchQueryResponse response) { @Override public Observable query(final SpatialViewQuery query) { - Observable source = Observable.defer(new Func0>() { + Observable source = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(), - query.isDevelopment(), true, query.toString(), null, bucket, username, password); + query.isDevelopment(), true, query.toString(), null, bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }); @@ -2026,6 +2029,9 @@ public Observable> call(Throwable throwable .map(new Func1>() { @Override public DocumentFragment call(E element) { + if (element == null) { + throw new CASMismatchException(); + } return ResultMappingUtils.convertToSubDocumentResult(ResponseStatus.SUCCESS, mutationOperation, element); } }); diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java index a321fd12..dcc588a8 100644 --- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java +++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java @@ -325,11 +325,11 @@ private static List assembleSeedNodes(ConnectionString connectionString, */ private static void seedNodesViaDnsSrv(ConnectionString connectionString, CouchbaseEnvironment environment, List seedNodes) { - if (connectionString.hosts().size() == 1) { - InetSocketAddress lookupNode = connectionString.hosts().get(0); + if (connectionString.allHosts().size() == 1) { + InetSocketAddress lookupNode = connectionString.allHosts().get(0); LOGGER.debug( "Attempting to load DNS SRV records from {}.", - connectionString.hosts().get(0) + connectionString.allHosts().get(0) ); try { diff --git a/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java b/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java index 7e6dd5e7..e4f4c0ab 100644 --- a/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java +++ b/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java @@ -23,7 +23,7 @@ import com.couchbase.client.java.error.TranscodingException; import com.couchbase.client.java.transcoder.TranscoderUtils; import rx.Observable; -import rx.functions.Func0; +import rx.Subscriber; import rx.functions.Func1; import rx.functions.Func6; @@ -31,6 +31,7 @@ import java.util.List; import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; public class AnalyticsQueryExecutor { @@ -47,10 +48,13 @@ public AnalyticsQueryExecutor(ClusterFacade core, String bucket, String username } public Observable execute(final AnalyticsQuery query) { - return Observable.defer(new Func0>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { - return core.send(GenericAnalyticsRequest.jsonQuery(query.query().toString(), bucket, username, password)); + public Observable call(final Subscriber subscriber) { + GenericAnalyticsRequest request = GenericAnalyticsRequest + .jsonQuery(query.query().toString(), bucket, username, password); + request.subscriber(subscriber); + return core.send(request); } }).flatMap(new Func1>() { @Override diff --git a/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java b/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java index fab37dee..13bc3918 100644 --- a/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java +++ b/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java @@ -31,6 +31,7 @@ import com.couchbase.client.deps.io.netty.util.CharsetUtil; import com.couchbase.client.java.error.FlushDisabledException; import rx.Observable; +import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; @@ -39,6 +40,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; import static com.couchbase.client.java.util.retry.RetryBuilder.any; /** @@ -118,8 +120,15 @@ private static Observable> createMarkerDocuments(final ClusterFacad .from(FLUSH_MARKERS) .flatMap(new Func1>() { @Override - public Observable call(String id) { - return core.send(new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket)); + public Observable call(final String id) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(final Subscriber subscriber) { + UpsertRequest request = new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .doOnNext(new Action1() { @@ -152,8 +161,14 @@ public List call(UpsertResponse response) { * @return an observable indicating if done (true) or polling needs to happen (false). */ private static Observable initiateFlush(final ClusterFacade core, final String bucket, final String username, final String password) { - return core - .send(new FlushRequest(bucket, username, password)) + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + FlushRequest request = new FlushRequest(bucket, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) .map(new Func1() { @Override @@ -182,8 +197,15 @@ private static Observable pollMarkerDocuments(final ClusterFacade core, .from(FLUSH_MARKERS) .flatMap(new Func1>() { @Override - public Observable call(String id) { - return core.send(new GetRequest(id, bucket)); + public Observable call(final String id) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + GetRequest request = new GetRequest(id, bucket); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .reduce(0, new Func2() { diff --git a/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java b/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java index 76bc8c21..6f305719 100644 --- a/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java +++ b/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java @@ -117,7 +117,15 @@ public Observable info() { return Observable.defer(new Func0>() { @Override public Observable call() { - return core.send(new BucketConfigRequest("/pools/default/buckets/", null, bucket, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + BucketConfigRequest request = new BucketConfigRequest("/pools/default/buckets/", + null, bucket, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -151,7 +159,14 @@ public Observable getDesignDocuments(final boolean development) return Observable.defer(new Func0>() { @Override public Observable call() { - return core.send(new GetDesignDocumentsRequest(bucket, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(final Subscriber subscriber) { + GetDesignDocumentsRequest request = new GetDesignDocumentsRequest(bucket, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) diff --git a/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java b/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java index 94ba54a0..6d379828 100644 --- a/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java +++ b/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java @@ -33,11 +33,13 @@ import com.couchbase.client.java.error.CouchbaseOutOfMemoryException; import com.couchbase.client.java.error.TemporaryFailureException; import rx.Observable; +import rx.Subscriber; import rx.functions.Func0; import rx.functions.Func1; import java.util.ArrayList; import java.util.List; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; /** * Helper class to deal with reading from zero to N replicas and returning results. @@ -70,15 +72,19 @@ private ReplicaReader() {} public static Observable read(final ClusterFacade core, final String id, final ReplicaMode type, final String bucket) { return assembleRequests(core, id, type, bucket) - .flatMap(new Func1>() { - @Override - public Observable call(BinaryRequest request) { - return core - .send(request) - .filter(GetResponseFilter.INSTANCE) - .onErrorResumeNext(GetResponseErrorHandler.INSTANCE); - } - }); + .flatMap(new Func1>() { + @Override + public Observable call(final BinaryRequest request) { + Observable result = deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + request.subscriber(subscriber); + return core.send(request); + } + }).filter(GetResponseFilter.INSTANCE); + return result.onErrorResumeNext(GetResponseErrorHandler.INSTANCE); + } + }); } /** diff --git a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java index 31d5ad95..db5e5417 100644 --- a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java +++ b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java @@ -18,6 +18,8 @@ import com.couchbase.client.core.ClusterFacade; import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.core.annotations.InterfaceStability; +import com.couchbase.client.core.logging.CouchbaseLogger; +import com.couchbase.client.core.logging.CouchbaseLoggerFactory; import com.couchbase.client.core.message.config.BucketsConfigRequest; import com.couchbase.client.core.message.config.BucketsConfigResponse; import com.couchbase.client.core.message.config.ClusterConfigRequest; @@ -53,15 +55,19 @@ import com.couchbase.client.java.error.InvalidPasswordException; import com.couchbase.client.java.error.TranscodingException; import rx.Observable; +import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; import static com.couchbase.client.java.util.retry.RetryBuilder.any; public class DefaultAsyncClusterManager implements AsyncClusterManager { @@ -104,7 +110,14 @@ public Observable info() { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - return core.send(new ClusterConfigRequest(username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + ClusterConfigRequest request = new ClusterConfigRequest(username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -138,7 +151,14 @@ public Observable getBuckets() { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - return core.send(new BucketsConfigRequest(username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + BucketsConfigRequest request = new BucketsConfigRequest(username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -231,7 +251,14 @@ public Observable removeBucket(final String name) { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - return core.send(new RemoveBucketRequest(name, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + RemoveBucketRequest request = new RemoveBucketRequest(name, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -258,7 +285,14 @@ public void call(Boolean exists) { }).flatMap(new Func1>() { @Override public Observable call(Boolean exists) { - return core.send(new InsertBucketRequest(payload, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + InsertBucketRequest request = new InsertBucketRequest(payload, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -288,7 +322,15 @@ public void call(Boolean exists) { }).flatMap(new Func1>() { @Override public Observable call(Boolean exists) { - return core.send(new UpdateBucketRequest(settings.name(), payload, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + UpdateBucketRequest request = new UpdateBucketRequest( + settings.name(), payload, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -307,48 +349,64 @@ public BucketSettings call(UpdateBucketResponse response) { public Observable upsertUser(final AuthDomain domain, final String userid, final UserSettings userSettings) { final String payload = getUserSettingsPayload(userSettings); return ensureServiceEnabled() - .flatMap(new Func1>() { - @Override - public Observable call(Boolean aBoolean) { - return core.send(new UpsertUserRequest(username, password, domain.alias(), userid, payload)); - } - }) - .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) - .map(new Func1() { - @Override - public Boolean call(UpsertUserResponse response) { - if (!response.status().isSuccess()) { - StringBuilder sb = new StringBuilder(); - sb.append("Could not update user: "); - sb.append(response.status()); - if (response.message().length() > 0) { - sb.append(", "); - sb.append("msg: "); - sb.append(response.message()); - } - throw new CouchbaseException(sb.toString()); - } - return true; - } - }); + .flatMap(new Func1>() { + @Override + public Observable call(Boolean aBoolean) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + UpsertUserRequest request = new UpsertUserRequest( + username, password, domain.alias(), userid, payload); + request.subscriber(subscriber); + return core.send(request); + } + }); + } + }) + .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) + .map(new Func1() { + @Override + public Boolean call(UpsertUserResponse response) { + if (!response.status().isSuccess()) { + StringBuilder sb = new StringBuilder(); + sb.append("Could not update user: "); + sb.append(response.status()); + if (response.message().length() > 0) { + sb.append(", "); + sb.append("msg: "); + sb.append(response.message()); + } + throw new CouchbaseException(sb.toString()); + } + return true; + } + }); } @Override public Observable removeUser(final AuthDomain domain, final String userid) { return ensureServiceEnabled() - .flatMap(new Func1>() { - @Override - public Observable call(Boolean aBoolean) { - return core.send(new RemoveUserRequest(username, password, domain.alias(), userid)); - } - }) - .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) - .map(new Func1() { - @Override - public Boolean call(RemoveUserResponse response) { - return response.status().isSuccess(); - } - }); + .flatMap(new Func1>() { + @Override + public Observable call(Boolean aBoolean) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + RemoveUserRequest request = new RemoveUserRequest( + username, password, domain.alias(), userid); + request.subscriber(subscriber); + return core.send(request); + } + }); + } + }) + .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) + .map(new Func1() { + @Override + public Boolean call(RemoveUserResponse response) { + return response.status().isSuccess(); + } + }); } @Override @@ -362,11 +420,16 @@ public Observable getUser(final AuthDomain domain, final String userid) { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - if (userid == null || userid.isEmpty()) { - return core.send(GetUsersRequest.usersFromDomain(username, password, domain.alias())); - } else { - return core.send(GetUsersRequest.user(username, password, domain.alias(), userid)); - } + final GetUsersRequest request = (userid == null || userid.isEmpty()) + ? GetUsersRequest.usersFromDomain(username, password, domain.alias()) + : GetUsersRequest.user(username, password, domain.alias(), userid); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -543,44 +606,48 @@ public Observable call(ClusterInfo clusterInfo) { }); } + Observable sendAddNodeRequest(final InetSocketAddress address) { + final NetworkAddress networkAddress = NetworkAddress.create(address.getAddress().getHostAddress()); + return core.send(new AddNodeRequest(networkAddress)) + .flatMap(new Func1>() { + @Override + public Observable call(AddNodeResponse addNodeResponse) { + if (!addNodeResponse.status().isSuccess()) { + throw new CouchbaseException("Could not enable ClusterManager service to function properly."); + } + int port = environment.sslEnabled() ? environment.bootstrapHttpSslPort() : environment.bootstrapHttpDirectPort(); + return core.send(new AddServiceRequest(ServiceType.CONFIG, username, password, port, networkAddress)); + } + }) + .map(new Func1() { + @Override + public Boolean call(AddServiceResponse addServiceResponse) { + if (!addServiceResponse.status().isSuccess()) { + throw new CouchbaseException("Could not enable ClusterManager service to function properly."); + } + return true; + } + }); + } + private Observable ensureServiceEnabled() { if (connectionString.hosts().isEmpty()) { return Observable.error(new IllegalStateException("No host found in the connection string! " + connectionString.toString())); } - return Observable - .just(connectionString.hosts().get(0).getAddress().getHostAddress()) - .map(new Func1() { - @Override - public NetworkAddress call(String hostname) { - return NetworkAddress.create(hostname); - } - }) - .flatMap(new Func1>() { - @Override - public Observable call(final NetworkAddress hostname) { - return core - .send(new AddNodeRequest(hostname)) - .flatMap(new Func1>() { - @Override - public Observable call(AddNodeResponse response) { - int port = environment.sslEnabled() - ? environment.bootstrapHttpSslPort() : environment.bootstrapHttpDirectPort(); - return core.send(new AddServiceRequest(ServiceType.CONFIG, username, password, - port, hostname)); - } - }); - } - }) - .map(new Func1() { - @Override - public Boolean call(AddServiceResponse addServiceResponse) { - if (!addServiceResponse.status().isSuccess()) { - throw new CouchbaseException("Could not enable ClusterManager service to function properly."); + final AtomicInteger integer = new AtomicInteger(0); + return Observable.just(connectionString.hosts()) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(List inetSocketAddresses) { + int hostIndex = integer.getAndIncrement(); + if (hostIndex >= connectionString.hosts().size()) { + integer.set(0); + return Observable.error(new CouchbaseException("Could not enable ClusterManager service to function properly.")); + } + return sendAddNodeRequest(inetSocketAddresses.get(hostIndex)); } - return true; - } - }); - } + }); + } } \ No newline at end of file diff --git a/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java b/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java index 9e9ab985..4c734ab9 100644 --- a/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java +++ b/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java @@ -30,7 +30,11 @@ import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.document.json.JsonValue; import rx.Observable; +import rx.Subscriber; import rx.functions.Func0; +import rx.functions.Func1; + +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; /** * A builder class to incrementally construct REST API requests and execute @@ -234,12 +238,13 @@ public RestApiRequest asRequest() { * @return an {@link Observable} of the result of the API call, which is a {@link RestApiResponse}. */ public Observable execute() { - return Observable.defer(new Func0>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { RestApiRequest apiRequest = asRequest(); LOGGER.debug("Executing Cluster API request {} on {}", apiRequest.method(), apiRequest.pathWithParameters()); - return core.send(asRequest()); + apiRequest.subscriber(subscriber); + return core.send(apiRequest); } }); } diff --git a/src/main/java/com/couchbase/client/java/query/N1qlParams.java b/src/main/java/com/couchbase/client/java/query/N1qlParams.java index 47c5aaee..885a8a4a 100644 --- a/src/main/java/com/couchbase/client/java/query/N1qlParams.java +++ b/src/main/java/com/couchbase/client/java/query/N1qlParams.java @@ -64,6 +64,7 @@ public class N1qlParams implements Serializable { private Map rawParams; private boolean pretty; private boolean readonly; + private N1qlProfile profile; private final Map credentials; @@ -142,6 +143,10 @@ public void injectParams(JsonObject queryJson) { queryJson.put("readonly", true); } + if (this.profile != null) { + queryJson.put("profile", this.profile.toString()); + } + if (this.rawParams != null) { for (Map.Entry entry : rawParams.entrySet()) { queryJson.put(entry.getKey(), entry.getValue()); @@ -410,6 +415,18 @@ public N1qlParams pipelineCap(int pipelineCap) { return this; } + /** + * Specifies if there should be a profile section returned with the request results. + * + * @param profile the profile param {@link N1qlProfile}. + * @return this {@link N1qlParams} for chaining. + */ + @InterfaceStability.Uncommitted + public N1qlParams profile(N1qlProfile profile) { + this.profile = profile; + return this; + } + /** * Allows to specify an arbitrary, raw N1QL param. * @@ -480,6 +497,7 @@ public boolean equals(Object o) { if (mutationState != null ? !mutationState.equals(that.mutationState) : that.mutationState != null) return false; if (!credentials.equals(that.credentials)) return false; + if (profile != that.profile) return false; return rawParams != null ? rawParams.equals(that.rawParams) : that.rawParams == null; } @@ -500,6 +518,7 @@ public int hashCode() { result = 31 * result + (adhoc ? 1 : 0); result = 31 * result + (pretty ? 1 : 0); result = 31 * result + (readonly ? 1 : 0); + result = 31 * result + (profile != null ? profile.hashCode() : 0); return result; } @@ -521,6 +540,9 @@ public String toString() { sb.append(", rawParams=").append(rawParams); if (!credentials.isEmpty()) sb.append(", credentials=").append(credentials.size()); + if (profile != null) { + sb.append(", profile=").append(profile.toString()); + } sb.append('}'); return sb.toString(); } diff --git a/src/main/java/com/couchbase/client/java/query/N1qlProfile.java b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java new file mode 100644 index 00000000..b66357f9 --- /dev/null +++ b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) 2018 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.couchbase.client.java.query; + +import com.couchbase.client.core.annotations.InterfaceAudience; +import com.couchbase.client.core.annotations.InterfaceStability; + +/** + * N1ql profile options + */ +@InterfaceStability.Uncommitted +@InterfaceAudience.Public +public enum N1qlProfile { + + /** + * No profiling information is added to the query response. + */ + OFF { + @Override + public String toString() { + return "off"; + } + }, + + /** + * The query response includes a profile section with stats and details + * about various phases of the query plan and execution. + * Three phase times will be included in the system:active_requests and + * system:completed_requests monitoring keyspaces. + */ + PHASES { + @Override + public String toString() { + return "phases"; + } + }, + + /** + * Besides the phase times, the profile section of the query response document will + * include a full query plan with timing and information about the number of processed + * documents at each phase. This information will be included in the system:active_requests + * and system:completed_requests keyspaces. + */ + TIMINGS { + @Override + public String toString() { + return "timings"; + } + } +} \ No newline at end of file diff --git a/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java b/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java index e3c4de6b..bdcc9ef0 100644 --- a/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java +++ b/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java @@ -53,6 +53,7 @@ import com.couchbase.client.java.transcoder.TranscoderUtils; import com.couchbase.client.java.util.LRUCache; import rx.Observable; +import rx.Subscriber; import rx.exceptions.CompositeException; import rx.functions.Func0; import rx.functions.Func1; @@ -68,6 +69,7 @@ import java.util.Map; import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; /** * A class used to execute various N1QL queries. @@ -186,10 +188,12 @@ public Observable execute(final N1qlQuery query) { * @return a result containing all found rows and additional information. */ protected Observable executeQuery(final N1qlQuery query) { - return Observable.defer(new Func0>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { - return core.send(createN1qlRequest(query, bucket, username, password, null)); + public Observable call(Subscriber subscriber) { + GenericQueryRequest request = createN1qlRequest(query, bucket, username, password, null); + request.subscriber(subscriber); + return core.send(request); } }).flatMap(new Func1>() { @Override @@ -439,10 +443,12 @@ protected Observable prepare(Statement statement) { if (isEncodedPlanEnabled()) { //we'll include the encodedPlan in each EXECUTE, so we don't broadcast during PREPARE - source = Observable.defer(new Func0>() { + source = deferAndWatch(new Func1>() { @Override - public Observable call() { - return core.send(createN1qlRequest(query, bucket, username, password, null)); + public Observable call(Subscriber subscriber) { + GenericQueryRequest request = createN1qlRequest(query, bucket, username, password, null); + request.subscriber(subscriber); + return core.send(request); } }); } else { @@ -470,8 +476,14 @@ public Boolean call(NodeInfo nodeInfo) { public Observable call(NodeInfo nodeInfo) { try { InetAddress hostname = InetAddress.getByName(nodeInfo.hostname().address()); - GenericQueryRequest req = createN1qlRequest(query, bucket, username, password, hostname); - return core.send(req); + final GenericQueryRequest req = createN1qlRequest(query, bucket, username, password, hostname); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + req.subscriber(subscriber); + return core.send(req); + } + }); } catch (UnknownHostException e) { return Observable.error(e); } diff --git a/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java b/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java index d943997f..e381fb29 100644 --- a/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java +++ b/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java @@ -17,6 +17,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -30,11 +31,11 @@ public class ReflectionBasedEntityMetadata implements EntityMetadata { private final List properties; private final PropertyMetadata idProperty; - public ReflectionBasedEntityMetadata(Class sourceEntity) { + public ReflectionBasedEntityMetadata(final Class sourceEntity) { properties = new ArrayList(); PropertyMetadata idProperty = null; - for (Field field : sourceEntity.getDeclaredFields()) { + for (Field field : getAllDeclaredFields(sourceEntity)) { PropertyMetadata property = new ReflectionBasedPropertyMetadata(field); properties.add(property); if (property.isId()) { @@ -45,6 +46,24 @@ public ReflectionBasedEntityMetadata(Class sourceEntity) { this.idProperty = idProperty; } + /** + * Helper method to grab all the declared fields from the given class but also + * from its inherited parents! + * + * @param sourceEntity the source entity to start from. + * @return an iterable of found fields. + */ + private static List getAllDeclaredFields(final Class sourceEntity) { + List fields = new ArrayList(); + Class clazz = sourceEntity; + while (clazz != null) { + Field[] f = clazz.getDeclaredFields(); + fields.addAll(Arrays.asList(f)); + clazz = clazz.getSuperclass(); + } + return fields; + } + @Override public List properties() { return properties; diff --git a/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java b/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java index 736c4cfe..e08a45c0 100644 --- a/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java +++ b/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java @@ -1038,9 +1038,9 @@ protected Observable> doMultiMutate() { } } - Observable> mutations = Observable.defer(new Func0>() { + Observable> mutations = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber s) { List bufList = new ArrayList(mutationSpecs.size()); final List commands = new ArrayList(mutationSpecs.size()); @@ -1063,15 +1063,11 @@ public Observable call() { } } } - return Observable.from(commands); - } - }).toList() - .flatMap(new Func1, Observable>(){ - @Override - public Observable call(List mutationCommands) { - return core.send(new SubMultiMutationRequest(docId, bucketName, + SubMultiMutationRequest request = new SubMultiMutationRequest(docId, bucketName, expiry, cas, SubMultiMutationDocOptionsBuilder.builder().upsertDocument(upsertDocument).insertDocument(insertDocument), - mutationCommands)); + commands); + request.subscriber(s); + return core.send(request); } }).flatMap(new Func1>>() { @Override diff --git a/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java b/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java index 34e9c0f1..e4bf9b4f 100644 --- a/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java +++ b/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java @@ -145,6 +145,7 @@ public Observable n1qlToRawCustom(final N1qlQuery query, final Func1 call(Subscriber s) { RawQueryRequest request = RawQueryRequest.jsonQuery(query.n1ql().toString(), bucket, username, password); + request.subscriber(s); return core.send(request); } }).map(new Func1() { @@ -217,10 +218,11 @@ public Observable ftsToRawCustom(final SearchQuery query, final Func1>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { SearchQueryRequest request = new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }).map(new Func1() {