WSO2 DAS is wso2 analytics platform which is capable of publishing an event to DAS and come up with summarized data for your environment. This document illustrates how to use DAS admin service to retrieve data by querying spark query on wso2 DAS. The common way of retrieving data from DAS is using REST API and using RDBMS and other event types like SMS etc. The most advantages with this method are which is very efficient when retrieving data from das and possible to query very complex spark querying that apache Lucene does not support. Also for this, we no need another extra Datasource like RDBMS to get data and we can retrieve data directly from DAS internal tables.
<dependencies> <dependency> <groupId>org.wso2.carbon.analytics</groupId> <artifactId>org.wso2.carbon.analytics.spark.stub</artifactId> <version>${carbon.analytics.version}</version> </dependency> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160212</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.2.4</version> </dependency> </dependencies> <properties> <carbon.analytics.version>1.0.6-alpha3</carbon.analytics.version> </properties> <repositories> <repository> <id>wso2-nexus</id> <name>WSO2 internal Repository</name> <url>http://maven.wso2.org/nexus/content/groups/wso2-public/</url> <releases> <enabled>true</enabled> <updatePolicy>daily</updatePolicy> <checksumPolicy>ignore</checksumPolicy> </releases> </repository> </repositories>
String DAS_USERNAME = "admin"; String DAS_PASSWORD = "admin"; String DAS_ANALYTICS_PROCESSOR_SERVICE_URL = "https://localhost:9444/services/AnalyticsProcessorAdminService"; // initiating admin service stub for executing script AnalyticsProcessorAdminServiceStub stub = new AnalyticsProcessorAdminServiceStub( DAS_ANALYTICS_PROCESSOR_SERVICE_URL); ServiceClient client = stub._getServiceClient(); Options client_options = client.getOptions(); HttpTransportProperties.Authenticator authenticator = new HttpTransportProperties.Authenticator(); authenticator.setUsername(DAS_USERNAME); authenticator.setPassword(DAS_PASSWORD); authenticator.setPreemptiveAuthentication(true); client_options.setProperty( org.apache.axis2.transport.http.HTTPConstants.AUTHENTICATE, authenticator); client.setOptions(client_options);
AnalyticsQueryResultDto dto; String executeQeury = "select api, version, userId, apiPublisher, year, month, day, sum(total_request_count) as total_count from dataTable" + " group by api, version, userId, apiPublisher, year, month, day"; dto = stub.executeQuery(executeQeury);
private static Map<String, Object> convertToMap(AnalyticsQueryResultDto dto) throws JSONException { Map<String, Object> map = new HashMap<String, Object>(); // JSONArray payloadData = new // JSONObject(stream).getJSONArray("payloadData"); String[] cols = dto.getColumnNames(); AnalyticsRowResultDto[] rows = dto.getRowsResults(); for (int j = 0; j < rows.length; j++) { AnalyticsRowResultDto val = rows[j]; for (int i = 0; i < cols.length; i++) { map.put(cols[i], val.getColumnValues()[i]); } } return map; }
package com.rukspot.sample.wso2.das; import java.util.HashMap; import java.util.Map; import org.apache.axis2.client.Options; import org.apache.axis2.client.ServiceClient; import org.apache.axis2.transport.http.HttpTransportProperties; import org.json.JSONException; import org.wso2.carbon.analytics.spark.admin.stub.AnalyticsProcessorAdminServiceStub; import org.wso2.carbon.analytics.spark.admin.stub.AnalyticsProcessorAdminServiceStub.AnalyticsQueryResultDto; import org.wso2.carbon.analytics.spark.admin.stub.AnalyticsProcessorAdminServiceStub.AnalyticsRowResultDto; import com.google.gson.Gson; public class ExecuteQuery { public static void main(String[] args) throws Exception { System.setProperty("javax.net.ssl.trustStore", "/home/rukshan/wso2-jks/wso2carbon.jks"); System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon"); // TODO Auto-generated method stub executeScriptOnDas(); } public static void executeScriptOnDas() throws Exception { System.out.println("Starting APIM STAT Script"); String DAS_USERNAME = "admin"; String DAS_PASSWORD = "admin"; String DAS_ANALYTICS_PROCESSOR_SERVICE_URL = "https://localhost:9444/services/AnalyticsProcessorAdminService"; // initiating admin service stub for executing script AnalyticsProcessorAdminServiceStub stub = new AnalyticsProcessorAdminServiceStub( DAS_ANALYTICS_PROCESSOR_SERVICE_URL); ServiceClient client = stub._getServiceClient(); Options client_options = client.getOptions(); HttpTransportProperties.Authenticator authenticator = new HttpTransportProperties.Authenticator(); authenticator.setUsername(DAS_USERNAME); authenticator.setPassword(DAS_PASSWORD); authenticator.setPreemptiveAuthentication(true); client_options.setProperty( org.apache.axis2.transport.http.HTTPConstants.AUTHENTICATE, authenticator); client.setOptions(client_options); String createTempTable = "create temporary table dataTable USING CarbonAnalytics OPTIONS(tableName \"API_REQUEST_SUMMARY\")"; stub.executeQuery(createTempTable); String executeQeury = "select api, version, userId, apiPublisher, year, month, day, sum(total_request_count) as total_count from dataTable" + " group by api, version, userId, apiPublisher, year, month, day"; dto = stub.executeQuery(executeQeury); Gson g = new Gson(); System.out.println(g.toJson(dto.getColumnNames())); System.out.println(g.toJson(dto.getRowsResults())); Map<String, Object> map = convertToMap(dto); System.out.println(g.toJson(map)); } private static Map<String, Object> convertToMap(AnalyticsQueryResultDto dto) throws JSONException { Map<String, Object> map = new HashMap<String, Object>(); // JSONArray payloadData = new // JSONObject(stream).getJSONArray("payloadData"); String[] cols = dto.getColumnNames(); AnalyticsRowResultDto[] rows = dto.getRowsResults(); for (int j = 0; j < rows.length; j++) { AnalyticsRowResultDto val = rows[j]; for (int i = 0; i < cols.length; i++) { map.put(cols[i], val.getColumnValues()[i]); } } return map; } }
Add Comment
Comments (0)