@@ -58,7 +58,7 @@ public void start() {
5858 this .schemaURL = config ().getString ("schemaURL" );
5959 this .observers = config ().getJsonArray ("observers" );
6060 this .siotStubUrl = config ().getString ("siot_stub_url" );
61- this .logger = LoggerFactory .getInstance ().getLogger ();
61+ this .logger = LoggerFactory .getInstance ().getLogger ();
6262
6363 this .eb = vertx .eventBus ();
6464 this .eb .<JsonObject >consumer (this .url , onMessage ());
@@ -117,8 +117,8 @@ public Handler<Message<JsonObject>> onMessage() {
117117 logger .debug (logMessage + "New message -> " + message .body ().toString ());
118118 if (mandatoryFieldsValidator (message )) {
119119
120- logger .debug (logMessage + "[NewData] -> [Worker]-" + Thread .currentThread ().getName ()
121- + " \n [Data] " + message .body ());
120+ logger .debug (logMessage + "[NewData] -> [Worker]-" + Thread .currentThread ().getName () + " \n [Data] "
121+ + message .body ());
122122
123123 final JsonObject body = new JsonObject (message .body ().toString ()).getJsonObject ("body" );
124124 final String type = new JsonObject (message .body ().toString ()).getString ("type" );
@@ -138,8 +138,7 @@ public Handler<Message<JsonObject>> onMessage() {
138138
139139 JsonObject toSearch = new JsonObject ().put ("identity" , identity );
140140
141- logger .debug (
142- logMessage + "Search on " + this .collection + " with data" + toSearch .toString ());
141+ logger .debug (logMessage + "Search on " + this .collection + " with data" + toSearch .toString ());
143142
144143 mongoClient .find (this .collection , toSearch , res -> {
145144 if (res .result ().size () != 0 ) {
@@ -202,6 +201,7 @@ public Future<String> findDataObjectStream(String objURL, String guid) {
202201 Future <String > doStream = Future .future ();
203202
204203 mongoClient .find (this .dataObjectsCollection , new JsonObject ().put ("objURL" , objURL ), res -> {
204+ System .out .println ("dataobject find result(" + objURL + ") :" + res .result ().size ());
205205 for (int i = 0 ; i < res .result ().size (); i ++) {
206206 String currentGuid = res .result ().get (i ).getJsonObject ("metadata" ).getString ("guid" );
207207 if (currentGuid .equals (guid )) {
@@ -234,7 +234,8 @@ public void onNotification(JsonObject body) {
234234 Future <String > CheckURL = findDataObjectStream (objURL , guid );
235235 CheckURL .setHandler (asyncResult -> {
236236 if (asyncResult .succeeded ()) {
237- if (CheckURL == null ) {
237+
238+ if (asyncResult .result () == null ) {
238239 Future <Boolean > persisted = persistDataObjUserURL (streamID , guid , objURL , "reporter" );
239240 persisted .setHandler (res -> {
240241 if (res .succeeded ()) {
@@ -342,7 +343,8 @@ public Future<Boolean> persistDataObjUserURL(String streamID, String guid, Strin
342343 document .put ("guid" , guid );
343344 document .put ("type" , type );
344345
345- JsonObject toInsert = new JsonObject ().put ("url" , streamID ).put ("objURL" , objURL ).put ("metadata" , document );
346+ JsonObject toInsert = new JsonObject ().put ("url" , streamID ).put ("objURL" , objURL ).put ("metadata" , document )
347+ .put ("ratingType" , ratingType );
346348 logger .debug ("Creating DO entry -> " + toInsert .toString ());
347349
348350 mongoClient .save (dataObjectsCollection , toInsert , res2 -> {
@@ -363,10 +365,9 @@ public Future<Boolean> persistDataObjUserURL(String address, String guid, String
363365 document .put ("type" , type );
364366
365367 JsonObject toFind = new JsonObject ().put ("url" , address );
366- JsonObject toInsert = new JsonObject ().put ("url" , address )
367- .put ("metadata" , document )
368- .put ("ratingType" , ratingType );
369-
368+ JsonObject toInsert = new JsonObject ().put ("url" , address ).put ("metadata" , document ).put ("ratingType" ,
369+ ratingType );
370+
370371 logger .debug ("Creating DO entry -> " + toInsert .toString ());
371372 new Thread (() -> {
372373
@@ -379,7 +380,8 @@ public Future<Boolean> persistDataObjUserURL(String address, String guid, String
379380 System .out .println ("Setup complete - dataobjects + already exist, but ratingType undefined" );
380381 mongoClient .removeDocuments (dataObjectsCollection , toFind , deleteResult -> {
381382 mongoClient .save (dataObjectsCollection , toInsert , resInsert -> {
382- System .out .println ("Setup complete - dataobjects + Insert" + resInsert .result ().toString ());
383+ System .out .println (
384+ "Setup complete - dataobjects + Insert" + resInsert .result ().toString ());
383385 dataPersisted .complete (resInsert .succeeded ());
384386 });
385387 });
@@ -460,7 +462,6 @@ public boolean validateSource(String from, String address, JsonObject identity,
460462 logger .debug ("validating source ... from:" + from + "\n observers:" + observers .getList ().toString ()
461463 + "\n ourUserURL:" + this .identity .getJsonObject ("userProfile" ).getString ("userURL" ) + "\n COLLECTION:"
462464 + collection );
463-
464465
465466 return true ;
466467
@@ -542,90 +543,84 @@ public boolean mandatoryFieldsValidator(Message<JsonObject> message) {
542543 return true ;
543544
544545 }
545-
546-
547-
546+
548547 public void resumeDataObjects (String ratingType ) {
549-
550- JsonObject tofind = new JsonObject ().put ("ratingType" , ratingType );
551- logger .debug ("Resuming dataobjects ratingType-> " + ratingType );
552- mongoClient .find (dataObjectsCollection , tofind , allDataObjects -> {
553- logger .debug ("GetAllDataObjects complete - " + allDataObjects .result ().size ());
554- for (int i = 0 ; i < allDataObjects .result ().size (); i ++) {
555- String dataObjectUrl = allDataObjects .result ().get (i ).getString ("url" );
556- onChanges (dataObjectUrl );
557- }
558- });
559- }
548+
549+ JsonObject tofind = new JsonObject ().put ("ratingType" , ratingType );
550+ logger .debug ("Resuming dataobjects ratingType-> " + ratingType );
551+ mongoClient .find (dataObjectsCollection , tofind , allDataObjects -> {
552+ logger .debug ("GetAllDataObjects complete - " + allDataObjects .result ().size ());
553+ for (int i = 0 ; i < allDataObjects .result ().size (); i ++) {
554+ String dataObjectUrl = allDataObjects .result ().get (i ).getString ("url" );
555+ if (allDataObjects .result ().get (i ).containsKey ("objURL" )) {
556+ onChanges (allDataObjects .result ().get (i ).getString ("objURL" ));
557+ } else {
558+ onChanges (dataObjectUrl );
559+ }
560+
561+ }
562+ });
563+ }
560564
561565 public void cleanDuplicatedDataObjects () {
562-
566+
563567 mongoClient .find (dataObjectsCollection , new JsonObject (), resultHandler -> {
564-
565-
566-
567- //all documents
568+
569+ // all documents
568570 List <JsonObject > dataObjectsArray = resultHandler .result ();
569571 logger .info ("All dos size:" + dataObjectsArray .size ());
570-
571- //to fill with single url of each dataobject
572+
573+ // to fill with single url of each dataobject
572574 List <String > dataObjectsUrl = new ArrayList <String >();
573-
575+
574576 if (dataObjectsArray .size () > 0 ) {
575577 int i ;
576578 for (i = 0 ; i < dataObjectsArray .size (); i ++) {
577-
579+
578580 JsonObject currentDO = dataObjectsArray .get (i );
579581 String url = currentDO .getString ("url" );
580-
582+
581583 if (!currentDO .containsKey ("objURL" ) && !dataObjectsUrl .contains (url )) {
582- //add to list
584+ // add to list
583585 dataObjectsUrl .add (url );
584-
586+
585587 JsonObject toRemove = new JsonObject ().put ("url" , url );
586-
587-
588- //JsonObject document = currentDO.getJsonObject("metadata");
589588
590- //JsonObject toAdd = new JsonObject().put("url", url)
591- // .put("metadata", document);
589+ // JsonObject document = currentDO.getJsonObject("metadata");
590+
591+ // JsonObject toAdd = new JsonObject().put("url", url)
592+ // .put("metadata", document);
592593 currentDO .remove ("_id" );
593-
594+
594595 mongoClient .removeDocuments (dataObjectsCollection , toRemove , deleteResult -> {
595596 mongoClient .save (dataObjectsCollection , currentDO , resInsert -> {
596597 logger .info ("Setup complete - dataobjects + Insert" + resInsert .result ().toString ());
597598 });
598599 });
599-
600+
600601 }
601602 }
602603 logger .info ("single do size:" + dataObjectsUrl .size ());
603604 }
604-
605-
606- /*
607- mongoClient.find(dataObjectsCollection, new JsonObject(), res2 -> {
608- if (res2.result().size() > 0) {
609- if (res2.result().get(0).containsKey("ratingType")) {
610- System.out.println("Setup complete - dataobjects + already exist");
611- dataPersisted.complete(true);
612- } else {
613- System.out.println("Setup complete - dataobjects + already exist, but ratingType undefined");
614- mongoClient.removeDocuments(dataObjectsCollection, toFind, deleteResult -> {
615- mongoClient.save(dataObjectsCollection, toInsert, resInsert -> {
616- System.out.println("Setup complete - dataobjects + Insert" + resInsert.result().toString());
617- dataPersisted.complete(resInsert.succeeded());
618- });
619- });
620- }
621-
622- } else {
623- mongoClient.save(dataObjectsCollection, toInsert, resInsert -> {
624- System.out.println("Setup complete - dataobjects + Insert" + resInsert.result().toString());
625- dataPersisted.complete(resInsert.succeeded());
626- });
627- }
628- });*/
605+
606+ /*
607+ * mongoClient.find(dataObjectsCollection, new JsonObject(), res2 -> { if
608+ * (res2.result().size() > 0) { if
609+ * (res2.result().get(0).containsKey("ratingType")) {
610+ * System.out.println("Setup complete - dataobjects + already exist");
611+ * dataPersisted.complete(true); } else { System.out.
612+ * println("Setup complete - dataobjects + already exist, but ratingType undefined"
613+ * ); mongoClient.removeDocuments(dataObjectsCollection, toFind, deleteResult ->
614+ * { mongoClient.save(dataObjectsCollection, toInsert, resInsert -> {
615+ * System.out.println("Setup complete - dataobjects + Insert" +
616+ * resInsert.result().toString());
617+ * dataPersisted.complete(resInsert.succeeded()); }); }); }
618+ *
619+ * } else { mongoClient.save(dataObjectsCollection, toInsert, resInsert -> {
620+ * System.out.println("Setup complete - dataobjects + Insert" +
621+ * resInsert.result().toString());
622+ * dataPersisted.complete(resInsert.succeeded()); }); } });
623+ */
629624 });
630625 }
631626}
0 commit comments