From 932e922bee264464dfc59320b53f1b7e4c9507b6 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Tue, 6 Mar 2018 10:27:45 +0100 Subject: [PATCH 01/17] Prepare 2.5.6 Release Change-Id: Ia4ca391fc9ed660a9783dcb4d3c907f4f334aeb0 Reviewed-on: http://review.couchbase.org/90498 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 1b7bd017..d6a591d3 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.6-SNAPSHOT + 2.5.6 jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.6-SNAPSHOT + 1.5.6 4.12 1.10.19 1.7.7 From 7fb5fa878fbf10b744c435d514f0d22ec3b259a0 Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Fri, 30 Mar 2018 15:02:09 -0700 Subject: [PATCH 02/17] Start 2.5.7 development Change-Id: I86180c25d0d027dd22acb9d7b8ad6440ef29250b Reviewed-on: http://review.couchbase.org/91922 Reviewed-by: Subhashni Balakrishnan Tested-by: Subhashni Balakrishnan --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d6a591d3..0bb41533 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.6 + 2.5.7-SNAPSHOT jar Couchbase Java SDK From 7f97c14c65eeb55ff5dc21a683335deacd440a2c Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Fri, 30 Mar 2018 15:24:38 -0700 Subject: [PATCH 03/17] Change to core 1.5.7 snapshot Change-Id: I35e2db38ba01613c445cbbc587d27c63cdda367c Reviewed-on: http://review.couchbase.org/91926 Reviewed-by: Subhashni Balakrishnan Tested-by: Subhashni Balakrishnan --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0bb41533..00ca6526 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.6 + 1.5.7-SNAPSHOT 4.12 1.10.19 1.7.7 From 3f88b9d8890ec00681c955364afa0cca48a5381d Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Tue, 6 Mar 2018 16:29:09 -0800 Subject: [PATCH 04/17] Integration test for JVMCBC-513 Change-Id: I74d78f75580e23f8344ce26d8883a52fda939bd8 Reviewed-on: http://review.couchbase.org/91924 Tested-by: Subhashni Balakrishnan Reviewed-by: Subhashni Balakrishnan --- .../java/com/couchbase/client/java/ConnectionTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/integration/java/com/couchbase/client/java/ConnectionTest.java b/src/integration/java/com/couchbase/client/java/ConnectionTest.java index 444e0f4c..b2431b63 100644 --- a/src/integration/java/com/couchbase/client/java/ConnectionTest.java +++ b/src/integration/java/com/couchbase/client/java/ConnectionTest.java @@ -75,6 +75,12 @@ public void shouldThrowConfigurationExceptionForWrongBucketPassword() { cluster.openBucket(TestProperties.bucket(), "completelyWrongPassword"); } + @Test + public void shouldBootstrapWithBadHost() { + Cluster cluster = CouchbaseCluster.create("badnode", TestProperties.seedNode()); + cluster.openBucket(TestProperties.bucket(), TestProperties.password()); + } + @Test public void shouldCacheBucketReference() { Cluster cluster = CouchbaseCluster.create(TestProperties.seedNode()); From c69781893831740d572c1b15f1726fa1a069b89f Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Thu, 8 Mar 2018 16:43:01 -0800 Subject: [PATCH 05/17] JCBC-1175 Fix incorrect mapping of queue subdoc remove Motivation ---------- Simulataneuously queue mutation could cause incorrect behavior of queue pop Changes ------- If the recursion returns null, there was a cas mismatch so dont map the result as success, instead return cas mismatch exception. Results ------- Multithreaded queue pop test verifies the change. Change-Id: I9a9a831c5b5e299e41f59f0b25ad4d45c28165c2 Reviewed-on: http://review.couchbase.org/91925 Reviewed-by: Matt Ingenthron Tested-by: Subhashni Balakrishnan --- .../datastructures/DataStructuresTest.java | 54 +++++++++++++++++++ .../client/java/CouchbaseAsyncBucket.java | 3 ++ 2 files changed, 57 insertions(+) diff --git a/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java b/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java index 4dd74757..364a727b 100644 --- a/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java +++ b/src/integration/java/com/couchbase/client/java/datastructures/DataStructuresTest.java @@ -19,7 +19,12 @@ import com.couchbase.client.java.PersistTo; import com.couchbase.client.java.error.subdoc.PathInvalidException; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.java.bucket.BucketType; @@ -34,6 +39,8 @@ import com.couchbase.client.java.util.CouchbaseTestContext; import com.couchbase.client.java.util.features.CouchbaseFeature; import org.junit.*; +import rx.functions.Action0; +import rx.functions.Action1; public class DataStructuresTest { @@ -357,4 +364,51 @@ public void testSetSizeOnNonExistentDocument() { ctx.bucket().setSize("dssetRandom"); } + + @Test + public void testMultiThreadQueuePop() throws Exception { + ctx.bucket().queuePush("testMultiThreadQueuePop", 1, MutationOptionBuilder.builder().createDocument(true)); + + final CountDownLatch latch = new CountDownLatch(10); + ExecutorService pool = Executors.newFixedThreadPool(10); + final AtomicInteger atomicInteger = new AtomicInteger(); + + for (int i=0; i < 10; i++) { + pool.submit(new Runnable() { + @Override + public void run() { + ctx.bucket().async().queuePop("testMultiThreadQueuePop", Integer.class) + .subscribe( + new Action1() { + @Override + public void call(Integer val) { + if (val == 1) { + atomicInteger.incrementAndGet(); + } else { + Assert.assertEquals(val, null); + } + } + }, + new Action1() { + @Override + public void call(Throwable throwable) { + //ignore + latch.countDown(); + } + }, + new Action0() { + @Override + public void call() { + latch.countDown(); + } + } + + ); + } + }); + } + latch.await(); + Assert.assertEquals(1, atomicInteger.get()); + } + } \ No newline at end of file diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java index 530bc0b8..372f3cad 100644 --- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java +++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java @@ -2026,6 +2026,9 @@ public Observable> call(Throwable throwable .map(new Func1>() { @Override public DocumentFragment call(E element) { + if (element == null) { + throw new CASMismatchException(); + } return ResultMappingUtils.convertToSubDocumentResult(ResponseStatus.SUCCESS, mutationOperation, element); } }); From 037f7250f1aae5ba045dd25c552e64589dd88494 Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Fri, 30 Mar 2018 10:57:57 -0700 Subject: [PATCH 06/17] JCBC-1179 Add profile option to N1qlParams Change-Id: Icbd317c44153b85ab213a42160c1152e4fe89bc9 Reviewed-on: http://review.couchbase.org/91928 Reviewed-by: Subhashni Balakrishnan Tested-by: Subhashni Balakrishnan --- .../couchbase/client/java/N1qlQueryTest.java | 18 ++++++ .../client/java/query/N1qlParams.java | 21 +++++++ .../client/java/query/N1qlProfile.java | 58 +++++++++++++++++++ 3 files changed, 97 insertions(+) create mode 100644 src/main/java/com/couchbase/client/java/query/N1qlProfile.java diff --git a/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java b/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java index c0253042..419658d6 100644 --- a/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java +++ b/src/integration/java/com/couchbase/client/java/N1qlQueryTest.java @@ -340,4 +340,22 @@ public void shouldWorkWithPrettyFalse() { assertNotNull(result.allRows()); assertNotNull(result.errors()); } + + @Test + public void shouldWorkWithProfileInfoEnabled() { + ctx.ignoreIfClusterUnder(Version.parseVersion("4.5.1")); + N1qlQuery query = N1qlQuery.simple( + select("*").fromCurrentBucket().limit(1), + N1qlParams.build().profile(N1qlProfile.TIMINGS).consistency(CONSISTENCY) + ); + + N1qlQueryResult result = ctx.bucket().query(query); + assertEquals(1, result.allRows().size()); + assertTrue(result.parseSuccess()); + assertTrue(result.finalSuccess()); + assertNotNull(result.info()); + assertNotNull(result.allRows()); + assertNotNull(result.errors()); + assertTrue(result.profileInfo().size() > 0); + } } diff --git a/src/main/java/com/couchbase/client/java/query/N1qlParams.java b/src/main/java/com/couchbase/client/java/query/N1qlParams.java index 47c5aaee..24eaa236 100644 --- a/src/main/java/com/couchbase/client/java/query/N1qlParams.java +++ b/src/main/java/com/couchbase/client/java/query/N1qlParams.java @@ -64,6 +64,7 @@ public class N1qlParams implements Serializable { private Map rawParams; private boolean pretty; private boolean readonly; + private N1qlProfile profile; private final Map credentials; @@ -142,6 +143,10 @@ public void injectParams(JsonObject queryJson) { queryJson.put("readonly", true); } + if (this.profile != null) { + queryJson.put("profile", this.profile.toString()); + } + if (this.rawParams != null) { for (Map.Entry entry : rawParams.entrySet()) { queryJson.put(entry.getKey(), entry.getValue()); @@ -410,6 +415,17 @@ public N1qlParams pipelineCap(int pipelineCap) { return this; } + /** + * Specifies if there should be a profile section returned with the request results. + * + * @param profile the profile param {@link N1qlProfile}. + * @return this {@link N1qlParams} for chaining. + */ + public N1qlParams profile(N1qlProfile profile) { + this.profile = profile; + return this; + } + /** * Allows to specify an arbitrary, raw N1QL param. * @@ -480,6 +496,7 @@ public boolean equals(Object o) { if (mutationState != null ? !mutationState.equals(that.mutationState) : that.mutationState != null) return false; if (!credentials.equals(that.credentials)) return false; + if (profile != that.profile) return false; return rawParams != null ? rawParams.equals(that.rawParams) : that.rawParams == null; } @@ -500,6 +517,7 @@ public int hashCode() { result = 31 * result + (adhoc ? 1 : 0); result = 31 * result + (pretty ? 1 : 0); result = 31 * result + (readonly ? 1 : 0); + result = 31 * result + (profile != null ? profile.hashCode() : 0); return result; } @@ -521,6 +539,9 @@ public String toString() { sb.append(", rawParams=").append(rawParams); if (!credentials.isEmpty()) sb.append(", credentials=").append(credentials.size()); + if (profile != null) { + sb.append(", profile=").append(profile.toString()); + } sb.append('}'); return sb.toString(); } diff --git a/src/main/java/com/couchbase/client/java/query/N1qlProfile.java b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java new file mode 100644 index 00000000..d70b639c --- /dev/null +++ b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2018 Couchbase, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.couchbase.client.java.query; + +/** + * N1ql profile options + */ +public enum N1qlProfile { + + /** + * No profiling information is added to the query response. + */ + OFF { + @Override + public String toString() { + return "off"; + } + }, + + /** + * The query response includes a profile section with stats and details + * about various phases of the query plan and execution. + * Three phase times will be included in the system:active_requests and + * system:completed_requests monitoring keyspaces. + */ + PHASES { + @Override + public String toString() { + return "phases"; + } + }, + + /** + * Besides the phase times, the profile section of the query response document will + * include a full query plan with timing and information about the number of processed + * documents at each phase. This information will be included in the system:active_requests + * and system:completed_requests keyspaces. + */ + TIMINGS { + @Override + public String toString() { + return "timings"; + } + } +} \ No newline at end of file From 9ed9ff5802c6f2a8625819d2dc80e0941bf11060 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Tue, 3 Apr 2018 12:11:09 +0200 Subject: [PATCH 07/17] JCBC-1179: Mark N1qlParams as uncommitted. Since new API shouldn't be marked as insta-stable, the new n1ql profiling API is marked as uncomitted for release25 and will be promoted to stable later. Change-Id: I3762b2081be658879867e86776367723cf341ca2 Reviewed-on: http://review.couchbase.org/92028 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- .../java/com/couchbase/client/java/query/N1qlParams.java | 1 + .../java/com/couchbase/client/java/query/N1qlProfile.java | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/src/main/java/com/couchbase/client/java/query/N1qlParams.java b/src/main/java/com/couchbase/client/java/query/N1qlParams.java index 24eaa236..885a8a4a 100644 --- a/src/main/java/com/couchbase/client/java/query/N1qlParams.java +++ b/src/main/java/com/couchbase/client/java/query/N1qlParams.java @@ -421,6 +421,7 @@ public N1qlParams pipelineCap(int pipelineCap) { * @param profile the profile param {@link N1qlProfile}. * @return this {@link N1qlParams} for chaining. */ + @InterfaceStability.Uncommitted public N1qlParams profile(N1qlProfile profile) { this.profile = profile; return this; diff --git a/src/main/java/com/couchbase/client/java/query/N1qlProfile.java b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java index d70b639c..b66357f9 100644 --- a/src/main/java/com/couchbase/client/java/query/N1qlProfile.java +++ b/src/main/java/com/couchbase/client/java/query/N1qlProfile.java @@ -15,9 +15,14 @@ */ package com.couchbase.client.java.query; +import com.couchbase.client.core.annotations.InterfaceAudience; +import com.couchbase.client.core.annotations.InterfaceStability; + /** * N1ql profile options */ +@InterfaceStability.Uncommitted +@InterfaceAudience.Public public enum N1qlProfile { /** From 17b4f94308b4e53beb471f2852ee93179feee38b Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Wed, 4 Apr 2018 11:04:25 -0700 Subject: [PATCH 08/17] Prepare for 2.5.7 release Change-Id: I9722302465a3531f0c3d66389c1f1a831b6ea061 Reviewed-on: http://review.couchbase.org/92132 Reviewed-by: Subhashni Balakrishnan Tested-by: Subhashni Balakrishnan --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 00ca6526..772fb780 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.7-SNAPSHOT + 2.5.7 jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.7-SNAPSHOT + 1.5.7 4.12 1.10.19 1.7.7 From b4932a8a5e603b8f9a557bac9f4c05ed619b49aa Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Mon, 23 Apr 2018 15:16:59 -0700 Subject: [PATCH 09/17] JCBC-1194 Enable service on nodes in the bootstrap list in round robin Change to select new node from bootstrap list in round robin way for retries to enable services on nodes. Change-Id: I4ea9a61313f8e39eb7167702624458068ba552c6 Reviewed-on: http://review.couchbase.org/93137 Reviewed-by: Michael Nitschinger Tested-by: Subhashni Balakrishnan Reviewed-on: http://review.couchbase.org/93505 Reviewed-by: Subhashni Balakrishnan --- .../couchbase/client/java/ConnectionTest.java | 7 ++ .../cluster/DefaultAsyncClusterManager.java | 74 ++++++++++--------- 2 files changed, 48 insertions(+), 33 deletions(-) diff --git a/src/integration/java/com/couchbase/client/java/ConnectionTest.java b/src/integration/java/com/couchbase/client/java/ConnectionTest.java index b2431b63..16446a45 100644 --- a/src/integration/java/com/couchbase/client/java/ConnectionTest.java +++ b/src/integration/java/com/couchbase/client/java/ConnectionTest.java @@ -81,6 +81,13 @@ public void shouldBootstrapWithBadHost() { cluster.openBucket(TestProperties.bucket(), TestProperties.password()); } + @Test + public void shouldProvideClusterInfoWithBadHostInBootstrapList() { + Cluster cluster = CouchbaseCluster.create("1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", TestProperties.seedNode()); + cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword()); + cluster.clusterManager().info(); + } + @Test public void shouldCacheBucketReference() { Cluster cluster = CouchbaseCluster.create(TestProperties.seedNode()); diff --git a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java index 31d5ad95..59980431 100644 --- a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java +++ b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java @@ -18,6 +18,8 @@ import com.couchbase.client.core.ClusterFacade; import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.core.annotations.InterfaceStability; +import com.couchbase.client.core.logging.CouchbaseLogger; +import com.couchbase.client.core.logging.CouchbaseLoggerFactory; import com.couchbase.client.core.message.config.BucketsConfigRequest; import com.couchbase.client.core.message.config.BucketsConfigResponse; import com.couchbase.client.core.message.config.ClusterConfigRequest; @@ -56,11 +58,13 @@ import rx.functions.Action1; import rx.functions.Func1; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static com.couchbase.client.java.util.retry.RetryBuilder.any; @@ -543,44 +547,48 @@ public Observable call(ClusterInfo clusterInfo) { }); } + Observable sendAddNodeRequest(final InetSocketAddress address) { + final NetworkAddress networkAddress = NetworkAddress.create(address.getAddress().getHostAddress()); + return core.send(new AddNodeRequest(networkAddress)) + .flatMap(new Func1>() { + @Override + public Observable call(AddNodeResponse addNodeResponse) { + if (!addNodeResponse.status().isSuccess()) { + throw new CouchbaseException("Could not enable ClusterManager service to function properly."); + } + int port = environment.sslEnabled() ? environment.bootstrapHttpSslPort() : environment.bootstrapHttpDirectPort(); + return core.send(new AddServiceRequest(ServiceType.CONFIG, username, password, port, networkAddress)); + } + }) + .map(new Func1() { + @Override + public Boolean call(AddServiceResponse addServiceResponse) { + if (!addServiceResponse.status().isSuccess()) { + throw new CouchbaseException("Could not enable ClusterManager service to function properly."); + } + return true; + } + }); + } + private Observable ensureServiceEnabled() { if (connectionString.hosts().isEmpty()) { return Observable.error(new IllegalStateException("No host found in the connection string! " + connectionString.toString())); } - return Observable - .just(connectionString.hosts().get(0).getAddress().getHostAddress()) - .map(new Func1() { - @Override - public NetworkAddress call(String hostname) { - return NetworkAddress.create(hostname); - } - }) - .flatMap(new Func1>() { - @Override - public Observable call(final NetworkAddress hostname) { - return core - .send(new AddNodeRequest(hostname)) - .flatMap(new Func1>() { - @Override - public Observable call(AddNodeResponse response) { - int port = environment.sslEnabled() - ? environment.bootstrapHttpSslPort() : environment.bootstrapHttpDirectPort(); - return core.send(new AddServiceRequest(ServiceType.CONFIG, username, password, - port, hostname)); - } - }); - } - }) - .map(new Func1() { - @Override - public Boolean call(AddServiceResponse addServiceResponse) { - if (!addServiceResponse.status().isSuccess()) { - throw new CouchbaseException("Could not enable ClusterManager service to function properly."); + final AtomicInteger integer = new AtomicInteger(0); + return Observable.just(connectionString.hosts()) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(List inetSocketAddresses) { + int hostIndex = integer.getAndIncrement(); + if (hostIndex >= connectionString.hosts().size()) { + integer.set(0); + return Observable.error(new CouchbaseException("Could not enable ClusterManager service to function properly.")); + } + return sendAddNodeRequest(inetSocketAddresses.get(hostIndex)); } - return true; - } - }); - } + }); + } } \ No newline at end of file From bda88013a759724a228209f79063e53e4d01dee7 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Mon, 30 Apr 2018 21:41:49 +0200 Subject: [PATCH 10/17] Bump to 2.5.8-SNAPSHOT Change-Id: Id78b25a414932c5df4aeb62c474a9fe87c6770d1 Reviewed-on: http://review.couchbase.org/93512 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 772fb780..0d809d46 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.7 + 2.5.8-SNAPSHOT jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.7 + 1.5.8-SNAPSHOT 4.12 1.10.19 1.7.7 From e84d052862a1975471898981272ae156037e0036 Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Tue, 1 May 2018 12:08:11 -0700 Subject: [PATCH 11/17] Prepare for release 2.5.8 Change-Id: I5264ba33d02fe0a549db6595c3cb473d9a7cf631 Reviewed-on: http://review.couchbase.org/93569 Reviewed-by: Subhashni Balakrishnan Tested-by: Subhashni Balakrishnan --- README.md | 2 +- pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2e3cb2f6..fef0eb47 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ The easiest way is to download the jar as well as its transitive dependencies (o com.couchbase.client java-client - 2.5.5 + 2.5.8 ``` diff --git a/pom.xml b/pom.xml index 0d809d46..45ea3525 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.8-SNAPSHOT + 2.5.8 jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.8-SNAPSHOT + 1.5.8 4.12 1.10.19 1.7.7 From ca1587604f66451b5b9369abda851e8428888988 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Thu, 24 May 2018 13:49:34 +0200 Subject: [PATCH 12/17] JCBC-1158: Support annotations in parent classes Motivation ---------- On our experimental repository API, it was reported that if the ID (or any other field for that matter) was stored on a parent class it is not picked up properly by our mapper. Modifications ------------- This changeset makes sure that all the fields looked at are recursively picked up from parent classes as well so that if a child class is passed in all the proper fields from parent classes can be used. Test case has been added to verify this. Result ------ Support for fields with annotations in parent classes. Change-Id: I219e1733476d2958699ddec8b4b52eacc6b7f19b Reviewed-on: http://review.couchbase.org/94676 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger Reviewed-on: http://review.couchbase.org/95134 --- .../java/repository/RepositoryTest.java | 54 +++++++++++++++++++ .../ReflectionBasedEntityMetadata.java | 23 +++++++- 2 files changed, 75 insertions(+), 2 deletions(-) diff --git a/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java b/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java index 56348564..459ca538 100644 --- a/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java +++ b/src/integration/java/com/couchbase/client/java/repository/RepositoryTest.java @@ -17,6 +17,7 @@ import com.couchbase.client.java.document.EntityDocument; import com.couchbase.client.java.document.JsonDocument; +import com.couchbase.client.java.repository.annotation.Field; import com.couchbase.client.java.repository.annotation.Id; import com.couchbase.client.java.repository.mapping.RepositoryMappingException; import com.couchbase.client.java.util.ClusterDependentTest; @@ -136,6 +137,27 @@ public void shouldFailWithNonStringIdProperty() { repository().upsert(EntityDocument.create(new EntityWithNoNStringId())); } + @Test + public void shouldUpsertExtendedEntity() { + Child entity = new Child("myid", "myname"); + EntityDocument document = EntityDocument.create(entity); + + assertFalse(repository().exists(document)); + assertEquals(0, document.cas()); + EntityDocument stored = repository().upsert(document); + assertNotEquals(0, stored.cas()); + assertEquals(document.content(), stored.content()); + + JsonDocument storedRaw = bucket().get(entity.getId()); + assertEquals(entity.getName(), storedRaw.content().getString("name")); + + EntityDocument found = repository().get(entity.getId(), Child.class); + assertEquals(found.cas(), stored.cas()); + assertNotEquals(0, found.cas()); + + assertTrue(repository().exists(document)); + } + static class EntityWithoutId { } @@ -149,4 +171,36 @@ static class EntityWithNoNStringId { Date id = new Date(); } + + public static abstract class Parent { + @Id + private String id; + + Parent(String id) { + this.id = id; + } + + public String getId() { + return id; + } + } + + public static class Child extends Parent { + @Field + private String name; + + public Child() { + super(null); + } + + public Child(String id, String name) { + super(id); + this.name = name; + } + + public String getName() { + return name; + } + } + } diff --git a/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java b/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java index d943997f..e381fb29 100644 --- a/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java +++ b/src/main/java/com/couchbase/client/java/repository/mapping/ReflectionBasedEntityMetadata.java @@ -17,6 +17,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -30,11 +31,11 @@ public class ReflectionBasedEntityMetadata implements EntityMetadata { private final List properties; private final PropertyMetadata idProperty; - public ReflectionBasedEntityMetadata(Class sourceEntity) { + public ReflectionBasedEntityMetadata(final Class sourceEntity) { properties = new ArrayList(); PropertyMetadata idProperty = null; - for (Field field : sourceEntity.getDeclaredFields()) { + for (Field field : getAllDeclaredFields(sourceEntity)) { PropertyMetadata property = new ReflectionBasedPropertyMetadata(field); properties.add(property); if (property.isId()) { @@ -45,6 +46,24 @@ public ReflectionBasedEntityMetadata(Class sourceEntity) { this.idProperty = idProperty; } + /** + * Helper method to grab all the declared fields from the given class but also + * from its inherited parents! + * + * @param sourceEntity the source entity to start from. + * @return an iterable of found fields. + */ + private static List getAllDeclaredFields(final Class sourceEntity) { + List fields = new ArrayList(); + Class clazz = sourceEntity; + while (clazz != null) { + Field[] f = clazz.getDeclaredFields(); + fields.addAll(Arrays.asList(f)); + clazz = clazz.getSuperclass(); + } + return fields; + } + @Override public List properties() { return properties; From 4fa5c7158ef83ec96abc13615aa5530cb804d636 Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Fri, 1 Jun 2018 11:51:46 -0700 Subject: [PATCH 13/17] JCBC-1207: Dont do reverse address lookup for DNS SRV bootstrap Motivation ---------- Changes for JVMCBC-513 made sure the addresses can be resolved to fix unhandled NPE. This check is not necessary for DNS SRV bootstrap. Changes ------- Use the unresolved hosts from ConnectionString in core. Changed the bad ips to x.y.z on the tests as they resolve now. Change-Id: I0821c247923144c103416d0a43d8ddf6ec2c9b73 Reviewed-on: http://review.couchbase.org/95228 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- .../java/com/couchbase/client/java/ConnectionTest.java | 9 ++++++++- .../com/couchbase/client/java/CouchbaseAsyncCluster.java | 6 +++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/integration/java/com/couchbase/client/java/ConnectionTest.java b/src/integration/java/com/couchbase/client/java/ConnectionTest.java index 16446a45..0b64682e 100644 --- a/src/integration/java/com/couchbase/client/java/ConnectionTest.java +++ b/src/integration/java/com/couchbase/client/java/ConnectionTest.java @@ -83,7 +83,14 @@ public void shouldBootstrapWithBadHost() { @Test public void shouldProvideClusterInfoWithBadHostInBootstrapList() { - Cluster cluster = CouchbaseCluster.create("1.1.1.1", "2.2.2.2", "3.3.3.3", "4.4.4.4", TestProperties.seedNode()); + Cluster cluster = CouchbaseCluster.create("x.y.z", TestProperties.seedNode()); + cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword()); + cluster.clusterManager().info(); + } + + @Test(expected = NullPointerException.class) + public void shouldNotCheckReverseLookupWhenDNSSRVEnabled() throws Exception { + Cluster cluster = CouchbaseCluster.create(DefaultCouchbaseEnvironment.builder().dnsSrvEnabled(true).build(), "x.y.z"); cluster.authenticate(TestProperties.adminName(), TestProperties.adminPassword()); cluster.clusterManager().info(); } diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java index a321fd12..dcc588a8 100644 --- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java +++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncCluster.java @@ -325,11 +325,11 @@ private static List assembleSeedNodes(ConnectionString connectionString, */ private static void seedNodesViaDnsSrv(ConnectionString connectionString, CouchbaseEnvironment environment, List seedNodes) { - if (connectionString.hosts().size() == 1) { - InetSocketAddress lookupNode = connectionString.hosts().get(0); + if (connectionString.allHosts().size() == 1) { + InetSocketAddress lookupNode = connectionString.allHosts().get(0); LOGGER.debug( "Attempting to load DNS SRV records from {}.", - connectionString.hosts().get(0) + connectionString.allHosts().get(0) ); try { From d7dcec6d0def6d1a91ad3b57a08373092d1b6da9 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Tue, 5 Jun 2018 14:00:01 +0200 Subject: [PATCH 14/17] JCBC-1209: Audit and add explicit subscribers on req creation Motivation ---------- In an earlier changeset the explicit subscriber management on the request has been added, but some of the APIs have been overlooked - this can lead to inconsistent behavior in core-io. Modifications ------------- This changeset audits all calls to core.send(...) and where needed adds the explicit subscriber management on the request since core-io checks if a subscriber is attached and if not it never treats it as non-active. Especially n1ql requests in this regard have been treaded differently before this changeset compared to kv requests. Result ------ Complete audit of .send() APIs and every request now properly handles it subscriber and can be checked against timeouts in core-io. Change-Id: I71f752384af7ac4fa88534531401901f92a1fb24 Reviewed-on: http://review.couchbase.org/95227 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- .../client/java/CouchbaseAsyncBucket.java | 19 ++- .../analytics/AnalyticsQueryExecutor.java | 12 +- .../client/java/bucket/BucketFlusher.java | 34 +++- .../bucket/DefaultAsyncBucketManager.java | 19 ++- .../client/java/bucket/ReplicaReader.java | 24 +-- .../cluster/DefaultAsyncClusterManager.java | 153 ++++++++++++------ .../java/cluster/api/AsyncRestBuilder.java | 11 +- .../java/query/core/N1qlQueryExecutor.java | 28 +++- .../java/subdoc/AsyncMutateInBuilder.java | 16 +- .../rawQuerying/AsyncRawQueryExecutor.java | 6 +- 10 files changed, 223 insertions(+), 99 deletions(-) diff --git a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java index 372f3cad..fa26c260 100644 --- a/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java +++ b/src/main/java/com/couchbase/client/java/CouchbaseAsyncBucket.java @@ -813,11 +813,12 @@ public Observable call(Throwable throwable) { @Override public Observable query(final ViewQuery query) { - Observable source = Observable.defer(new Func0>() { + Observable source = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(), query.isDevelopment(), query.toQueryString(), query.getKeys(), bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }); @@ -841,11 +842,12 @@ public Observable query(final SearchQuery query) { query.serverSideTimeout(environment().searchTimeout(), TimeUnit.MILLISECONDS); } - Observable source = Observable.defer(new Func0>() { + Observable source = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { final SearchQueryRequest request = - new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password); + new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }); @@ -872,11 +874,12 @@ public AsyncSearchQueryResult call(SearchQueryResponse response) { @Override public Observable query(final SpatialViewQuery query) { - Observable source = Observable.defer(new Func0>() { + Observable source = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { final ViewQueryRequest request = new ViewQueryRequest(query.getDesign(), query.getView(), - query.isDevelopment(), true, query.toString(), null, bucket, username, password); + query.isDevelopment(), true, query.toString(), null, bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }); diff --git a/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java b/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java index 7e6dd5e7..e4f4c0ab 100644 --- a/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java +++ b/src/main/java/com/couchbase/client/java/analytics/AnalyticsQueryExecutor.java @@ -23,7 +23,7 @@ import com.couchbase.client.java.error.TranscodingException; import com.couchbase.client.java.transcoder.TranscoderUtils; import rx.Observable; -import rx.functions.Func0; +import rx.Subscriber; import rx.functions.Func1; import rx.functions.Func6; @@ -31,6 +31,7 @@ import java.util.List; import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; public class AnalyticsQueryExecutor { @@ -47,10 +48,13 @@ public AnalyticsQueryExecutor(ClusterFacade core, String bucket, String username } public Observable execute(final AnalyticsQuery query) { - return Observable.defer(new Func0>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { - return core.send(GenericAnalyticsRequest.jsonQuery(query.query().toString(), bucket, username, password)); + public Observable call(final Subscriber subscriber) { + GenericAnalyticsRequest request = GenericAnalyticsRequest + .jsonQuery(query.query().toString(), bucket, username, password); + request.subscriber(subscriber); + return core.send(request); } }).flatMap(new Func1>() { @Override diff --git a/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java b/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java index fab37dee..13bc3918 100644 --- a/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java +++ b/src/main/java/com/couchbase/client/java/bucket/BucketFlusher.java @@ -31,6 +31,7 @@ import com.couchbase.client.deps.io.netty.util.CharsetUtil; import com.couchbase.client.java.error.FlushDisabledException; import rx.Observable; +import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; import rx.functions.Func2; @@ -39,6 +40,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; import static com.couchbase.client.java.util.retry.RetryBuilder.any; /** @@ -118,8 +120,15 @@ private static Observable> createMarkerDocuments(final ClusterFacad .from(FLUSH_MARKERS) .flatMap(new Func1>() { @Override - public Observable call(String id) { - return core.send(new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket)); + public Observable call(final String id) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(final Subscriber subscriber) { + UpsertRequest request = new UpsertRequest(id, Unpooled.copiedBuffer(id, CharsetUtil.UTF_8), bucket); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .doOnNext(new Action1() { @@ -152,8 +161,14 @@ public List call(UpsertResponse response) { * @return an observable indicating if done (true) or polling needs to happen (false). */ private static Observable initiateFlush(final ClusterFacade core, final String bucket, final String username, final String password) { - return core - .send(new FlushRequest(bucket, username, password)) + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + FlushRequest request = new FlushRequest(bucket, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) .map(new Func1() { @Override @@ -182,8 +197,15 @@ private static Observable pollMarkerDocuments(final ClusterFacade core, .from(FLUSH_MARKERS) .flatMap(new Func1>() { @Override - public Observable call(String id) { - return core.send(new GetRequest(id, bucket)); + public Observable call(final String id) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + GetRequest request = new GetRequest(id, bucket); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .reduce(0, new Func2() { diff --git a/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java b/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java index 76bc8c21..6f305719 100644 --- a/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java +++ b/src/main/java/com/couchbase/client/java/bucket/DefaultAsyncBucketManager.java @@ -117,7 +117,15 @@ public Observable info() { return Observable.defer(new Func0>() { @Override public Observable call() { - return core.send(new BucketConfigRequest("/pools/default/buckets/", null, bucket, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + BucketConfigRequest request = new BucketConfigRequest("/pools/default/buckets/", + null, bucket, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -151,7 +159,14 @@ public Observable getDesignDocuments(final boolean development) return Observable.defer(new Func0>() { @Override public Observable call() { - return core.send(new GetDesignDocumentsRequest(bucket, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(final Subscriber subscriber) { + GetDesignDocumentsRequest request = new GetDesignDocumentsRequest(bucket, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) diff --git a/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java b/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java index 94ba54a0..6d379828 100644 --- a/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java +++ b/src/main/java/com/couchbase/client/java/bucket/ReplicaReader.java @@ -33,11 +33,13 @@ import com.couchbase.client.java.error.CouchbaseOutOfMemoryException; import com.couchbase.client.java.error.TemporaryFailureException; import rx.Observable; +import rx.Subscriber; import rx.functions.Func0; import rx.functions.Func1; import java.util.ArrayList; import java.util.List; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; /** * Helper class to deal with reading from zero to N replicas and returning results. @@ -70,15 +72,19 @@ private ReplicaReader() {} public static Observable read(final ClusterFacade core, final String id, final ReplicaMode type, final String bucket) { return assembleRequests(core, id, type, bucket) - .flatMap(new Func1>() { - @Override - public Observable call(BinaryRequest request) { - return core - .send(request) - .filter(GetResponseFilter.INSTANCE) - .onErrorResumeNext(GetResponseErrorHandler.INSTANCE); - } - }); + .flatMap(new Func1>() { + @Override + public Observable call(final BinaryRequest request) { + Observable result = deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + request.subscriber(subscriber); + return core.send(request); + } + }).filter(GetResponseFilter.INSTANCE); + return result.onErrorResumeNext(GetResponseErrorHandler.INSTANCE); + } + }); } /** diff --git a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java index 59980431..db5e5417 100644 --- a/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java +++ b/src/main/java/com/couchbase/client/java/cluster/DefaultAsyncClusterManager.java @@ -55,6 +55,7 @@ import com.couchbase.client.java.error.InvalidPasswordException; import com.couchbase.client.java.error.TranscodingException; import rx.Observable; +import rx.Subscriber; import rx.functions.Action1; import rx.functions.Func1; @@ -66,6 +67,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; import static com.couchbase.client.java.util.retry.RetryBuilder.any; public class DefaultAsyncClusterManager implements AsyncClusterManager { @@ -108,7 +110,14 @@ public Observable info() { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - return core.send(new ClusterConfigRequest(username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + ClusterConfigRequest request = new ClusterConfigRequest(username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -142,7 +151,14 @@ public Observable getBuckets() { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - return core.send(new BucketsConfigRequest(username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + BucketsConfigRequest request = new BucketsConfigRequest(username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -235,7 +251,14 @@ public Observable removeBucket(final String name) { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - return core.send(new RemoveBucketRequest(name, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + RemoveBucketRequest request = new RemoveBucketRequest(name, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -262,7 +285,14 @@ public void call(Boolean exists) { }).flatMap(new Func1>() { @Override public Observable call(Boolean exists) { - return core.send(new InsertBucketRequest(payload, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + InsertBucketRequest request = new InsertBucketRequest(payload, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -292,7 +322,15 @@ public void call(Boolean exists) { }).flatMap(new Func1>() { @Override public Observable call(Boolean exists) { - return core.send(new UpdateBucketRequest(settings.name(), payload, username, password)); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + UpdateBucketRequest request = new UpdateBucketRequest( + settings.name(), payload, username, password); + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) @@ -311,48 +349,64 @@ public BucketSettings call(UpdateBucketResponse response) { public Observable upsertUser(final AuthDomain domain, final String userid, final UserSettings userSettings) { final String payload = getUserSettingsPayload(userSettings); return ensureServiceEnabled() - .flatMap(new Func1>() { - @Override - public Observable call(Boolean aBoolean) { - return core.send(new UpsertUserRequest(username, password, domain.alias(), userid, payload)); - } - }) - .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) - .map(new Func1() { - @Override - public Boolean call(UpsertUserResponse response) { - if (!response.status().isSuccess()) { - StringBuilder sb = new StringBuilder(); - sb.append("Could not update user: "); - sb.append(response.status()); - if (response.message().length() > 0) { - sb.append(", "); - sb.append("msg: "); - sb.append(response.message()); - } - throw new CouchbaseException(sb.toString()); - } - return true; - } - }); + .flatMap(new Func1>() { + @Override + public Observable call(Boolean aBoolean) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + UpsertUserRequest request = new UpsertUserRequest( + username, password, domain.alias(), userid, payload); + request.subscriber(subscriber); + return core.send(request); + } + }); + } + }) + .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) + .map(new Func1() { + @Override + public Boolean call(UpsertUserResponse response) { + if (!response.status().isSuccess()) { + StringBuilder sb = new StringBuilder(); + sb.append("Could not update user: "); + sb.append(response.status()); + if (response.message().length() > 0) { + sb.append(", "); + sb.append("msg: "); + sb.append(response.message()); + } + throw new CouchbaseException(sb.toString()); + } + return true; + } + }); } @Override public Observable removeUser(final AuthDomain domain, final String userid) { return ensureServiceEnabled() - .flatMap(new Func1>() { - @Override - public Observable call(Boolean aBoolean) { - return core.send(new RemoveUserRequest(username, password, domain.alias(), userid)); - } - }) - .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) - .map(new Func1() { - @Override - public Boolean call(RemoveUserResponse response) { - return response.status().isSuccess(); - } - }); + .flatMap(new Func1>() { + @Override + public Observable call(Boolean aBoolean) { + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + RemoveUserRequest request = new RemoveUserRequest( + username, password, domain.alias(), userid); + request.subscriber(subscriber); + return core.send(request); + } + }); + } + }) + .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) + .map(new Func1() { + @Override + public Boolean call(RemoveUserResponse response) { + return response.status().isSuccess(); + } + }); } @Override @@ -366,11 +420,16 @@ public Observable getUser(final AuthDomain domain, final String userid) { .flatMap(new Func1>() { @Override public Observable call(Boolean aBoolean) { - if (userid == null || userid.isEmpty()) { - return core.send(GetUsersRequest.usersFromDomain(username, password, domain.alias())); - } else { - return core.send(GetUsersRequest.user(username, password, domain.alias(), userid)); - } + final GetUsersRequest request = (userid == null || userid.isEmpty()) + ? GetUsersRequest.usersFromDomain(username, password, domain.alias()) + : GetUsersRequest.user(username, password, domain.alias(), userid); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + request.subscriber(subscriber); + return core.send(request); + } + }); } }) .retryWhen(any().delay(Delay.fixed(100, TimeUnit.MILLISECONDS)).max(Integer.MAX_VALUE).build()) diff --git a/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java b/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java index 9e9ab985..4c734ab9 100644 --- a/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java +++ b/src/main/java/com/couchbase/client/java/cluster/api/AsyncRestBuilder.java @@ -30,7 +30,11 @@ import com.couchbase.client.java.document.json.JsonObject; import com.couchbase.client.java.document.json.JsonValue; import rx.Observable; +import rx.Subscriber; import rx.functions.Func0; +import rx.functions.Func1; + +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; /** * A builder class to incrementally construct REST API requests and execute @@ -234,12 +238,13 @@ public RestApiRequest asRequest() { * @return an {@link Observable} of the result of the API call, which is a {@link RestApiResponse}. */ public Observable execute() { - return Observable.defer(new Func0>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { RestApiRequest apiRequest = asRequest(); LOGGER.debug("Executing Cluster API request {} on {}", apiRequest.method(), apiRequest.pathWithParameters()); - return core.send(asRequest()); + apiRequest.subscriber(subscriber); + return core.send(apiRequest); } }); } diff --git a/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java b/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java index e3c4de6b..bdcc9ef0 100644 --- a/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java +++ b/src/main/java/com/couchbase/client/java/query/core/N1qlQueryExecutor.java @@ -53,6 +53,7 @@ import com.couchbase.client.java.transcoder.TranscoderUtils; import com.couchbase.client.java.util.LRUCache; import rx.Observable; +import rx.Subscriber; import rx.exceptions.CompositeException; import rx.functions.Func0; import rx.functions.Func1; @@ -68,6 +69,7 @@ import java.util.Map; import static com.couchbase.client.java.CouchbaseAsyncBucket.JSON_OBJECT_TRANSCODER; +import static com.couchbase.client.java.util.OnSubscribeDeferAndWatch.deferAndWatch; /** * A class used to execute various N1QL queries. @@ -186,10 +188,12 @@ public Observable execute(final N1qlQuery query) { * @return a result containing all found rows and additional information. */ protected Observable executeQuery(final N1qlQuery query) { - return Observable.defer(new Func0>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { - return core.send(createN1qlRequest(query, bucket, username, password, null)); + public Observable call(Subscriber subscriber) { + GenericQueryRequest request = createN1qlRequest(query, bucket, username, password, null); + request.subscriber(subscriber); + return core.send(request); } }).flatMap(new Func1>() { @Override @@ -439,10 +443,12 @@ protected Observable prepare(Statement statement) { if (isEncodedPlanEnabled()) { //we'll include the encodedPlan in each EXECUTE, so we don't broadcast during PREPARE - source = Observable.defer(new Func0>() { + source = deferAndWatch(new Func1>() { @Override - public Observable call() { - return core.send(createN1qlRequest(query, bucket, username, password, null)); + public Observable call(Subscriber subscriber) { + GenericQueryRequest request = createN1qlRequest(query, bucket, username, password, null); + request.subscriber(subscriber); + return core.send(request); } }); } else { @@ -470,8 +476,14 @@ public Boolean call(NodeInfo nodeInfo) { public Observable call(NodeInfo nodeInfo) { try { InetAddress hostname = InetAddress.getByName(nodeInfo.hostname().address()); - GenericQueryRequest req = createN1qlRequest(query, bucket, username, password, hostname); - return core.send(req); + final GenericQueryRequest req = createN1qlRequest(query, bucket, username, password, hostname); + return deferAndWatch(new Func1>() { + @Override + public Observable call(Subscriber subscriber) { + req.subscriber(subscriber); + return core.send(req); + } + }); } catch (UnknownHostException e) { return Observable.error(e); } diff --git a/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java b/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java index 736c4cfe..e08a45c0 100644 --- a/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java +++ b/src/main/java/com/couchbase/client/java/subdoc/AsyncMutateInBuilder.java @@ -1038,9 +1038,9 @@ protected Observable> doMultiMutate() { } } - Observable> mutations = Observable.defer(new Func0>() { + Observable> mutations = deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber s) { List bufList = new ArrayList(mutationSpecs.size()); final List commands = new ArrayList(mutationSpecs.size()); @@ -1063,15 +1063,11 @@ public Observable call() { } } } - return Observable.from(commands); - } - }).toList() - .flatMap(new Func1, Observable>(){ - @Override - public Observable call(List mutationCommands) { - return core.send(new SubMultiMutationRequest(docId, bucketName, + SubMultiMutationRequest request = new SubMultiMutationRequest(docId, bucketName, expiry, cas, SubMultiMutationDocOptionsBuilder.builder().upsertDocument(upsertDocument).insertDocument(insertDocument), - mutationCommands)); + commands); + request.subscriber(s); + return core.send(request); } }).flatMap(new Func1>>() { @Override diff --git a/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java b/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java index 34e9c0f1..e4bf9b4f 100644 --- a/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java +++ b/src/main/java/com/couchbase/client/java/util/rawQuerying/AsyncRawQueryExecutor.java @@ -145,6 +145,7 @@ public Observable n1qlToRawCustom(final N1qlQuery query, final Func1 call(Subscriber s) { RawQueryRequest request = RawQueryRequest.jsonQuery(query.n1ql().toString(), bucket, username, password); + request.subscriber(s); return core.send(request); } }).map(new Func1() { @@ -217,10 +218,11 @@ public Observable ftsToRawCustom(final SearchQuery query, final Func1>() { + return deferAndWatch(new Func1>() { @Override - public Observable call() { + public Observable call(Subscriber subscriber) { SearchQueryRequest request = new SearchQueryRequest(indexName, query.export().toString(), bucket, username, password); + request.subscriber(subscriber); return core.send(request); } }).map(new Func1() { From 910401db39bdc3b30ac8abacf6b5b220a70940e2 Mon Sep 17 00:00:00 2001 From: Subhashni Balakrishnan Date: Wed, 6 Jun 2018 13:13:10 -0700 Subject: [PATCH 15/17] Fix missing import on backport for JCBC-1207 Change-Id: Ia72ee684f687b486659de9c0103642ba7768f529 Reviewed-on: http://review.couchbase.org/95271 Reviewed-by: Subhashni Balakrishnan Tested-by: Subhashni Balakrishnan --- .../java/com/couchbase/client/java/ConnectionTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/integration/java/com/couchbase/client/java/ConnectionTest.java b/src/integration/java/com/couchbase/client/java/ConnectionTest.java index 0b64682e..7eadf1a3 100644 --- a/src/integration/java/com/couchbase/client/java/ConnectionTest.java +++ b/src/integration/java/com/couchbase/client/java/ConnectionTest.java @@ -23,6 +23,7 @@ import static org.junit.Assume.assumeTrue; import com.couchbase.client.java.cluster.ClusterManager; +import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; import com.couchbase.client.java.error.BucketDoesNotExistException; import com.couchbase.client.java.error.InvalidPasswordException; import com.couchbase.client.java.util.TestProperties; From 6d0c16d683633284a08e85ee8835c03ce0ad639d Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Thu, 7 Jun 2018 20:01:20 +0200 Subject: [PATCH 16/17] Prepare 2.5.9 Release Change-Id: I0f3c1e932493bc8ee4fb9a6a3a2b432c227111c0 Reviewed-on: http://review.couchbase.org/95327 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- README.md | 2 +- pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index fef0eb47..1261fba2 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ The easiest way is to download the jar as well as its transitive dependencies (o com.couchbase.client java-client - 2.5.8 + 2.5.9 ``` diff --git a/pom.xml b/pom.xml index 45ea3525..bb13a3fd 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.8 + 2.5.9 jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.8 + 1.5.9 4.12 1.10.19 1.7.7 From 245ca7f45cef894cdf0243f26dd604d9f4fd1edd Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Fri, 14 Sep 2018 14:00:56 +0200 Subject: [PATCH 17/17] Start 2.5.10 Development Change-Id: Ib879e6e983e81db44c939064a5af46839c438eb2 Reviewed-on: http://review.couchbase.org/99605 Reviewed-by: Michael Nitschinger Tested-by: Michael Nitschinger --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index bb13a3fd..10a1e06d 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ com.couchbase.client java-client - 2.5.9 + 2.5.10-SNAPSHOT jar Couchbase Java SDK @@ -28,7 +28,7 @@ UTF-8 UTF-8 - 1.5.9 + 1.5.10-SNAPSHOT 4.12 1.10.19 1.7.7