Skip to content

Commit

Permalink
Rebased, fixed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nakomis committed Mar 27, 2014
1 parent ac41cc6 commit 55c6ffa
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 26 deletions.
2 changes: 1 addition & 1 deletion examples/webapps/hello-world-sql/src/main/webapp/mongo.jsp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<%@ page language="java" import="java.sql.*"%>
<%@ page language="java" import="java.sql.*,com.mongodb.*"%>

<html>
<head>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package brooklyn.entity.nosql.mongodb;

import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,14 +62,14 @@ public void customize() {
@Override
public boolean isRunning() {
try {
return MongoDBClientSupport.forServer((AbstractMongoDBServer) entity).getServerStatus().get("ok").equals(1.0);
return MongoDBClientSupport.forServer((AbstractMongoDBServer) entity).ping();
} catch (Exception e) {
return false;
}
}

/**
* Kills the server with SIGINT. Sending SIGKILL is likely to resuult in data corruption.
* Kills the server with SIGINT. Sending SIGKILL is likely to result in data corruption.
* @see <a href="http://docs.mongodb.org/manual/tutorial/manage-mongodb-processes/#sending-a-unix-int-or-term-signal">http://docs.mongodb.org/manual/tutorial/manage-mongodb-processes/#sending-a-unix-int-or-term-signal</a>
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ public BasicBSONObject getServerStatus() {
return EMPTY_RESPONSE;
}
}

public boolean ping() {
DBObject ping = new BasicDBObject("ping", "1");
try {
runDBCommand("admin", ping);
} catch (MongoException e) {
return false;
}
return true;
}

public boolean initializeReplicaSet(String replicaSetName, Integer id) {
HostAndPort primary = getServerHostAndPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ protected EntitySpec<?> getMemberSpec() {
return EntitySpec.create(MongoDBConfigServer.class);
}


protected boolean calculateServiceUp() {
// Number of config servers is fixed at INITIAL_SIZE
int requiredMembers = this.getConfig(INITIAL_SIZE);
int availableMembers = 0;
for (Entity entity : getMembers()) {
if (entity instanceof MongoDBConfigServer & entity.getAttribute(SERVICE_UP)) {
availableMembers++;
}
}
return availableMembers == requiredMembers;
}

@Override
public void start(Collection<? extends Location> locs) {
super.start(locs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public interface MongoDBRouter extends AbstractMongoDBServer {
new TypeToken<Iterable<String>>(){}, "mongodb.router.config.servers", "List of host names and ports of the config servers");

AttributeSensor<Integer> SHARD_COUNT = Sensors.newIntegerSensor("mongodb.router.config.shard.count", "Number of shards that have been added");

AttributeSensor<Boolean> RUNNING = Sensors.newBooleanSensor("mongodb.router.running", "Indicates that the router is running, "
+ "and can be used to add shards, but is not necessarity available for CRUD operations (e.g. if no shards have been added)");

public void waitForServiceUp(Duration duration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,24 @@
public interface MongoDBRouterCluster extends DynamicCluster {

AttributeSensor<MongoDBRouter> ANY_ROUTER = Sensors.newSensor(MongoDBRouter.class, "mongodb.routercluster.any",
"When set, can be used to access one of the routers in the cluster (usually the first)");
"When set, can be used to access one of the routers in the cluster (usually the first). This will only be set once "
+ "at least one shard has been added, and the router is available for CRUD operations");

AttributeSensor<MongoDBRouter> ANY_RUNNING_ROUTER = Sensors.newSensor(MongoDBRouter.class, "mongodb.routercluster.any.running",
"When set, can be used to access one of the running routers in the cluster (usually the first). This should only be used "
+ "to add shards as it does not guarantee that the router is available for CRUD operations");

/**
* @return One of the routers in the cluster if available, null otherwise
*/
MongoDBRouter getAnyRouter();

/**
* @return One of the running routers in the cluster. This should only be used to add shards as it does not guarantee that
* the router is available for CRUD operations
*/
MongoDBRouter getAnyRunningRouter();

/**
* @return All routers in the cluster
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
public class MongoDBRouterClusterImpl extends DynamicClusterImpl implements MongoDBRouterCluster {

@Override
public void start(Collection<? extends Location> locations) {
super.start(locations);
subscribeToMembers(this, Startable.SERVICE_UP, new SensorEventListener<Boolean>() {
public void init() {
super.init();
subscribeToChildren(this, MongoDBRouter.RUNNING, new SensorEventListener<Boolean>() {
public void onEvent(SensorEvent<Boolean> event) {
setAnyRouter();
}
});
}

@Override
public void start(Collection<? extends Location> locations) {
super.start(locations);
AbstractMembershipTrackingPolicy policy = new AbstractMembershipTrackingPolicy(MutableMap.of("name", "Router cluster membership tracker")) {
@Override
protected void onEntityAdded(Entity member) {
Expand All @@ -35,6 +40,10 @@ protected void onEntityAdded(Entity member) {
protected void onEntityRemoved(Entity member) {
setAnyRouter();
}
@Override
protected void onEntityChange(Entity member) {
setAnyRouter();
}
};
addPolicy(policy);
policy.setGroup(this);
Expand All @@ -46,6 +55,12 @@ protected void setAnyRouter() {
public boolean apply(MongoDBRouter input) {
return input.getAttribute(Startable.SERVICE_UP);
}}).orNull());

setAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER, Iterables.tryFind(getRouters(), new Predicate<MongoDBRouter>() {
@Override
public boolean apply(MongoDBRouter input) {
return input.getAttribute(MongoDBRouter.RUNNING);
}}).orNull());
}

@Override
Expand All @@ -72,4 +87,21 @@ public MongoDBRouter getAnyRouter() {
return getAttribute(MongoDBRouterCluster.ANY_ROUTER);
}

@Override
public MongoDBRouter getAnyRunningRouter() {
return getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
}

@Override
protected boolean calculateServiceUp() {
boolean anyRouterUp = false;
for (Entity entity : getMembers()) {
if (entity instanceof MongoDBRouter & entity.getAttribute(SERVICE_UP)) {
anyRouterUp = true;
break;
}
}
return anyRouterUp && super.calculateServiceUp();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
import brooklyn.event.feed.function.FunctionPollConfig;

import com.google.common.base.Functions;
import com.google.common.base.Optional;
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DBObject;

public class MongoDBRouterImpl extends SoftwareProcessImpl implements MongoDBRouter {

Expand All @@ -28,13 +24,23 @@ protected void connectSensors() {
super.connectSensors();
serviceUp = FunctionFeed.builder()
.entity(this)
.poll(new FunctionPollConfig<Boolean, Boolean>(RUNNING)
.period(5, TimeUnit.SECONDS)
.callable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this);
return clientSupport.ping();
}
})
.onException(Functions.<Boolean>constant(false)))
.poll(new FunctionPollConfig<Boolean, Boolean>(SERVICE_UP)
.period(5, TimeUnit.SECONDS)
.callable(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
MongoDBClientSupport clientSupport = MongoDBClientSupport.forServer(MongoDBRouterImpl.this);
return clientSupport.getServerStatus().get("ok").equals(1.0);
return clientSupport.ping() && MongoDBRouterImpl.this.getAttribute(SHARD_COUNT) > 0;
}
})
.onException(Functions.<Boolean>constant(false)))
Expand All @@ -49,4 +55,10 @@ public Integer call() throws Exception {
.onException(Functions.<Integer>constant(0)))
.build();
}

@Override
protected void disconnectSensors() {
super.disconnectSensors();
serviceUp.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void onEvent(SensorEvent<Boolean> event) {
super.start(locations);

MongoDBRouterCluster routers = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER);
subscribe(routers, MongoDBRouterCluster.ANY_ROUTER, new SensorEventListener<MongoDBRouter>() {
subscribe(routers, MongoDBRouterCluster.ANY_RUNNING_ROUTER, new SensorEventListener<MongoDBRouter>() {
public void onEvent(SensorEvent<MongoDBRouter> event) {
if (event.getValue() != null)
addShards();
Expand All @@ -67,8 +67,12 @@ protected void onEntityRemoved(Entity member) {
policy.setGroup(this);
}

protected boolean calculateServiceUp() {
return addedMembers.size() > 0 && super.calculateServiceUp();
}

protected void addShards() {
MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_ROUTER);
MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
if (router == null)
return;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.trait.Startable;
import brooklyn.location.Location;
import brooklyn.location.MachineLocation;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.exceptions.Exceptions;

import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;

public class MongoDBShardedDeploymentImpl extends AbstractEntity implements MongoDBShardedDeployment {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private MongoDBReplicaSet makeAndStartReplicaSet(final Integer size, final Strin
public void run() {
assertEquals(replicaSet.getCurrentSize(), size);
assertNotNull(replicaSet.getPrimary(), "replica set has no primary");
assertEquals(replicaSet.getPrimary().getReplicaSet().getName(), "test-rs-"+testDescription);
assertEquals(replicaSet.getPrimary().getReplicaSet().getName(), "test-rs-"+testDescription+replicaSet.getId());
assertEquals(replicaSet.getSecondaries().size(), size-1);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import brooklyn.entity.Entity;
import brooklyn.entity.basic.ApplicationBuilder;
import brooklyn.entity.basic.Entities;
import brooklyn.entity.nosql.mongodb.AbstractMongoDBServer;
import brooklyn.entity.nosql.mongodb.MongoDBReplicaSet;
import brooklyn.entity.nosql.mongodb.MongoDBTestHelper;
import brooklyn.entity.proxying.EntitySpec;
Expand All @@ -19,6 +20,7 @@
import brooklyn.test.entity.TestApplication;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.mongodb.DBObject;

public class MongoDBShardedDeploymentIntegrationTest {
Expand Down Expand Up @@ -87,9 +89,16 @@ public void testDeployedStructure() {
@Test(groups = "Integration")
private void testReadAndWriteDifferentRouters() {
MongoDBShardedDeployment deployment = makeAndStartDeployment();
EntityTestUtils.assertAttributeEqualsEventually(deployment, Startable.SERVICE_UP, true);
Iterator<Entity> routerIterator = deployment.getRouterCluster().getMembers().iterator();
MongoDBRouter router1 = (MongoDBRouter)routerIterator.next();
MongoDBRouter router2 = (MongoDBRouter)routerIterator.next();
EntityTestUtils.assertAttributeEqualsEventually(router1, Startable.SERVICE_UP, true);
EntityTestUtils.assertAttributeEqualsEventually(router2, Startable.SERVICE_UP, true);

for (Entity entity : Iterables.filter(app.getManagementContext().getEntityManager().getEntitiesInApplication(app), AbstractMongoDBServer.class)) {
EntityTestUtils.assertAttributeEqualsEventually(entity, Startable.SERVICE_UP, true);
}
String documentId = MongoDBTestHelper.insert(router1, "meaning-of-life", 42);
DBObject docOut = MongoDBTestHelper.getById(router2, documentId);
Assert.assertEquals(docOut.get("meaning-of-life"), 42);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import static com.google.common.base.Preconditions.checkNotNull;

public class BrooklynComponentTemplateResolver {

Expand Down Expand Up @@ -292,11 +291,10 @@ private void configureEntityConfig(EntitySpec<?> spec) {
protected Object transformSpecialFlags(Object flag, ManagementContext mgmt) {
if (flag instanceof EntitySpecConfiguration) {
EntitySpecConfiguration specConfig = (EntitySpecConfiguration) flag;
String type = (String) checkNotNull(specConfig.getSpecConfiguration().get("type"), "entitySpec must have key 'type'");
Class<? extends Entity> clazz = BrooklynEntityClassResolver.resolveEntity(type, mgmt);
@SuppressWarnings("unchecked")
Map<String, Object> config = (Map<String, Object>)transformSpecialFlags(specConfig.getSpecConfiguration(), mgmt);
return buildSpec(mgmt, clazz, config);
Map<String, Object> resolvedConfig = (Map<String, Object>)transformSpecialFlags(specConfig.getSpecConfiguration(), mgmt);
specConfig.setSpecConfiguration(resolvedConfig);
return Factory.newInstance(mgmt, specConfig.getSpecConfiguration()).resolveSpec();
}
return flag;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class EntitySpecConfiguration {
@SuppressWarnings("unused")
private static final Logger LOG = LoggerFactory.getLogger(EntitySpecConfiguration.class);

private final Map<String, Object> specConfiguration;
private Map<String, Object> specConfiguration;

public EntitySpecConfiguration(Map<String, ?> specConfiguration) {
this.specConfiguration = Maps.newHashMap(checkNotNull(specConfiguration, "specConfiguration"));
Expand All @@ -30,4 +30,11 @@ public EntitySpecConfiguration(Map<String, ?> specConfiguration) {
public Map<String, Object> getSpecConfiguration() {
return specConfiguration;
}

/**
* Allows BrooklynComponentTemplateResolver to traverse the configuration and resolve any entity specs
*/
public void setSpecConfiguration(Map<String, Object> specConfiguration) {
this.specConfiguration = specConfiguration;
}
}
2 changes: 1 addition & 1 deletion usage/cli/src/test/java/brooklyn/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void setUp() throws Exception {
public void tearDown() throws Exception {
if (executor != null) executor.shutdownNow();
if (app != null) Entities.destroyAll(app.getManagementContext());
if (exampleEntity != null) Entities.destroyAll(exampleEntity.getApplication().getManagementContext());
if (exampleEntity != null && exampleEntity.getApplication() != null) Entities.destroyAll(exampleEntity.getApplication().getManagementContext());
}

@Test
Expand Down

0 comments on commit 55c6ffa

Please sign in to comment.