@Test @Concurrency(count = 100) public void test() { setCallable(new RunnableCallable() { @Override public RunnableFuture run(int i) throws Exception { DistributedQueue queue = new DistributedQueue(ZooKeeperGetter.getZooKeeper(), "/queue", ZooDefs.Ids.OPEN_ACL_UNSAFE); String data = "data " + i; queue.offer(data.getBytes()); LOG.info("queue offer: " + data); Thread.sleep(new Random().nextInt(10) * 1000); LOG.info("queue poll: " + new String(queue.poll())); return RunnableFuture.DONE; } }); }
@Test public void testOffer1() throws Exception { String dir = "/testOffer1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[0].remove(); Assert.assertEquals(new String(dequeuedBytes), testString); }
@Test public void testOffer2() throws Exception { String dir = "/testOffer2"; String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[1].remove(); Assert.assertEquals(new String(dequeuedBytes), testString); }
@Test public void testTake1() throws Exception { String dir = "/testTake1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[0].take(); Assert.assertEquals(new String(dequeuedBytes), testString); }
@Test public void testRemove1() throws Exception{ String dir = "/testRemove1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } try{ queueHandles[0].remove(); }catch(NoSuchElementException e){ return; } Assert.assertTrue(false); }
public void createNremoveMtest(String dir,int n,int m) throws Exception{ String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } for(int i=0; i< n; i++){ String offerString = testString + i; queueHandles[0].offer(offerString.getBytes()); } byte data[] = null; for(int i=0; i<m; i++){ data=queueHandles[1].remove(); } Assert.assertEquals(new String(data), testString+(m-1)); }
public void createNremoveMelementTest(String dir,int n,int m) throws Exception{ String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } for(int i=0; i< n; i++){ String offerString = testString + i; queueHandles[0].offer(offerString.getBytes()); } byte data[] = null; for(int i=0; i<m; i++){ data=queueHandles[1].remove(); } Assert.assertEquals(new String(queueHandles[1].element()), testString+m); }
public void testOffer1() throws Exception { String dir = "/testOffer1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[0].remove(); assertEquals(new String(dequeuedBytes), testString); }
public void testOffer2() throws Exception { String dir = "/testOffer2"; String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[1].remove(); assertEquals(new String(dequeuedBytes), testString); }
public void testTake1() throws Exception { String dir = "/testTake1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } queueHandles[0].offer(testString.getBytes()); byte dequeuedBytes[] = queueHandles[0].take(); assertEquals(new String(dequeuedBytes), testString); }
public void testRemove1() throws Exception{ String dir = "/testRemove1"; String testString = "Hello World"; final int num_clients = 1; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } try{ queueHandles[0].remove(); }catch(NoSuchElementException e){ return; } assertTrue(false); }
public void createNremoveMtest(String dir,int n,int m) throws Exception{ String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } for(int i=0; i< n; i++){ String offerString = testString + i; queueHandles[0].offer(offerString.getBytes()); } byte data[] = null; for(int i=0; i<m; i++){ data=queueHandles[1].remove(); } assertEquals(new String(data), testString+(m-1)); }
public void createNremoveMelementTest(String dir,int n,int m) throws Exception{ String testString = "Hello World"; final int num_clients = 2; ZooKeeper clients[] = new ZooKeeper[num_clients]; DistributedQueue queueHandles[] = new DistributedQueue[num_clients]; for(int i=0; i < clients.length; i++){ clients[i] = createClient(); queueHandles[i] = new DistributedQueue(clients[i], dir, null); } for(int i=0; i< n; i++){ String offerString = testString + i; queueHandles[0].offer(offerString.getBytes()); } byte data[] = null; for(int i=0; i<m; i++){ data=queueHandles[1].remove(); } assertEquals(new String(queueHandles[1].element()), testString+m); }