Audit Replication of Alfresco Content Services (ACS) to Elastic Search using Spring Boot and Apache Camel.
This project uses a Pull/Push integration model, where the ACS audit stream is pulled from the Rest API, and pushed over to Elastic Search. Once audit data is in Elastic Search, the Kibana UI can plug in to generate dashboards and charts based on audit actions inside of Alfresco Content Services.
Clone Project from github
Configure to connect to Elastic Search 5.6
Configure to connect to Alfresco Content Services 5.2
Log into Alfresco Content Services and perform some activities
Upload documents
Download documents
Edit documents
Log into Kibana and configure data source based on audit logs
Generate some charts, dashboards, and queries
git clone git@github.com:alex4u2nv/alfresco-audit-replication.git
Simple Camel Route to pull/push
package org.alfresco.eai.routes; import org.alfresco.eai.services.AuditRestService; import org.alfresco.eai.services.AuditSyncService; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.elasticsearch5.ElasticsearchConstants; import org.apache.camel.model.dataformat.JsonLibrary; import org.elasticsearch.index.IndexNotFoundException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class AuditExportRoute extends RouteBuilder { private static final Logger LOG = LoggerFactory.getLogger(AuditExportRoute.class); @Value("${route.audit.export.enabled}") private boolean routeEnabled; @Autowired AuditRestService auditRestService; @Override public void configure() throws Exception { if (!routeEnabled) { LOG.info("Audit Export not enabled"); return; } from("scheduler://pollAuditStream?backoffMultiplier={{alfresco.audit.backoffMultiplier" + "}}&backoffIdleThreshold={{alfresco.audit.backoffTreshold}}&delay={{alfresco.audit" + ".timer" + ".period}}") .bean("configurations", "auditSearchRequest") .doTry() .to("elasticsearch5://docker-cluster?operation=SEARCH&transportAddresses={{elasticsearch" + ".transportAddress}}&clientTransportSniff=false") .doCatch(IndexNotFoundException.class) .log("Index Not yet initialized") .doFinally() .bean("auditSyncService", "getStartId") .to("direct:fetchAudit"); from("direct:fetchAudit") .setHeader("from", header(AuditSyncService.FROM_INDEX)) .setHeader("to", header(AuditSyncService.TO_INDEX)) .setHeader(ElasticsearchConstants.PARAM_INDEX_NAME, simple("{{elasticsearch.audit" + ".index}}")) .setHeader(ElasticsearchConstants.PARAM_INDEX_TYPE, simple("{{elasticsearch.audit.type}}")) .bean("auditRestService", "fetchAudit(${header.from}, ${header.to})") .choice() .when(simple("${body.count} > 0")) .to("direct:updateElasticSearch") .otherwise() .setProperty(Exchange.SCHEDULER_POLLED_MESSAGES, simple("false")); from("direct:updateElasticSearch") .split(simple("${body.entries}")) .setHeader(ElasticsearchConstants.PARAM_INDEX_ID, simple("${body.id}")) .marshal().json(JsonLibrary.Gson) .to("elasticsearch5://docker-cluster?operation=INDEX&transportAddresses={{elasticsearch.transportAddress}}" + "&clientTransportSniff=false"); } }