diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/pom.xml b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/pom.xml index a4fcf0c3..97e7ab5a 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/pom.xml +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/pom.xml @@ -111,7 +111,12 @@ org.apache.plc4x.merlot.archiver.test.NewMain - + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.0 + @@ -236,5 +241,12 @@ lz4 1.3.0 + + junit + junit + 4.13.2 + test + jar + diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/command/MerlotHtcCommand.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/command/MerlotHtcCommand.java index 37b529fe..9d751cd3 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/command/MerlotHtcCommand.java +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/command/MerlotHtcCommand.java @@ -16,12 +16,8 @@ */ package org.apache.plc4x.merlot.archiver.command; - import java.time.Instant; -import java.time.LocalDateTime; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.commons.lang3.tuple.Pair; import org.apache.karaf.shell.api.action.Action; import org.apache.karaf.shell.api.action.Argument; import org.apache.karaf.shell.api.action.Command; @@ -30,52 +26,56 @@ import org.apache.karaf.shell.api.action.lifecycle.Service; import org.apache.karaf.shell.support.table.ShellTable; import org.apache.plc4x.merlot.archiver.api.MerlotHtc; -import org.epics.gpclient.PV; +import org.apache.plc4x.merlot.archiver.impl.MerlotHtcIoTDBImpl; +import org.apache.plc4x.merlot.archiver.impl.MerlotHtcRTImpl; +import org.epics.vtype.Scalar; +import org.epics.vtype.Time; +import org.epics.vtype.VType; import org.osgi.framework.BundleContext; +import org.slf4j.LoggerFactory; @Service @Command(scope = "plc4x", name = "htc", description = "Historical collector admin.") -public class MerlotHtcCommand implements Action { +public class MerlotHtcCommand implements Action { @Reference BundleContext bc; - + @Reference - volatile List htcs; - + volatile List htcs; + @Option(name = "-l", aliases = "--list", description = "List collector PVs.", required = false, multiValued = false) - String strHtc; + String strHtc; @Option(name = "-p", aliases = "--pv", description = "Displays the stored values ​​of the process variable.", required = false, multiValued = false) String strPV; @Option(name = "-a", aliases = "--add", description = "The designated PV tags must be incorporated into the collector.", required = false, multiValued = false) - Boolean blnAdd; - + Boolean blnAdd; + @Option(name = "-f", aliases = "--from", description = "Start date for data query.", required = false, multiValued = false) - String from; - + String from; + @Option(name = "-t", aliases = "--to", description = "End date for data consultation.", required = false, multiValued = false) - String to; - + String to; + @Option(name = "-r", aliases = "--remove", description = "The designated PV tags must be remove from the collector.", required = false, multiValued = false) - String strRemovePV; - - + String strRemovePV; + @Argument(index = 0, name = "htc", description = "The Collector to which the PV tag will be assigned.", required = false, multiValued = false) - String strMainHtc; + String strMainHtc; @Argument(index = 1, name = "rate", description = "Maximum rate at which a tag change must be acquired.", required = false, multiValued = false) - String strMaxRate; - + String strMaxRate; + @Argument(index = 2, name = "pvs", description = "List of PVS tags to htc.", required = false, multiValued = true) - List strPVs; - - + List strPVs; + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotHtcCommand.class); @Override public Object execute() throws Exception { - + if ((null == strHtc) && (null == strMainHtc) ){ ListCollectorsServices(null); } else if ((null != strHtc) && (null == strMainHtc) && (null == strPV)) { @@ -87,75 +87,75 @@ public Object execute() throws Exception { } else if ((null != strRemovePV) && (null != strMainHtc)) { removePV(strMainHtc, strRemovePV); } + return null; } - - private void ListCollectorsServices(String id){ + + private void ListCollectorsServices(String id) { if (null == id) { ShellTable table = new ShellTable(); table.column("Uid"); table.column("Key"); htcs.forEach(htc -> { - table.addRow().addContent(htc.getID(),htc.getPVs().size()); + table.addRow().addContent(htc.getID(), htc.getPVs().size()); }); table.print(System.out); - + } else { htcs.forEach(h -> { - if(h.getID().equalsIgnoreCase(id)){ + if (h.getID().equalsIgnoreCase(id)) { int count = 1; ShellTable table = new ShellTable(); table.column("Item"); - table.column("PV"); + table.column("PV"); h.getPVs().forEach(pc -> { table.addRow().addContent(count, pc); }); - table.print(System.out); + table.print(System.out); } }); - + } } - - private void ListHistoricalValues(String pv, String init, String end){ + + private void ListHistoricalValues(String pv, String init, String end) { int[] counter = new int[1]; htcs.forEach(h -> { - if(h.getID().equalsIgnoreCase(strHtc)){ - String strInit = (null == init)?Instant.MIN.toString():init; - String strEnd = (null == end)?Instant.MAX.toString():end; + if (h.getID().equalsIgnoreCase(strHtc)) { + String strInit = (null == init) ? Instant.MIN.toString() : init; + String strEnd = (null == end) ? Instant.MAX.toString() : end; var pvs = h.getPVs(pv, strInit, strEnd); ShellTable table = new ShellTable(); table.column("Date"); - table.column("Value"); + table.column("Value"); counter[0] = 1; - pvs.forEach((p) ->{ - table.addRow().addContent(counter[0], p.toString()); + pvs.forEach((p) -> { + table.addRow().addContent(counter[0], p.toString()); counter[0]++; }); - table.print(System.out); + table.print(System.out); } - }); + }); } - + private void addPV(String pv, String maxRate, List listPVs) { - htcs.forEach(h -> { - if(h.getID().equalsIgnoreCase(strMainHtc)){ - listPVs.forEach(s ->{ - h.addPV(s, Double.MAX_VALUE); - }); - } - }); + htcs.forEach(h -> { + if (h.getID().equalsIgnoreCase(strMainHtc)) { + listPVs.forEach(s -> { + h.addPV(s, Double.MAX_VALUE); + }); + } + }); } - + private void removePV(String htc, String strpv) { - htcs.forEach(h -> { - if(h.getID().equalsIgnoreCase(htc)){ - h.removePV(strpv); - } - }); - } - - + htcs.forEach(h -> { + if (h.getID().equalsIgnoreCase(htc)) { + h.removePV(strpv); + } + }); + } + } diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotIoTDBMapping.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotIoTDBMapping.java new file mode 100644 index 00000000..8ce8ebb2 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotIoTDBMapping.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archiver.core; + +import com.google.protobuf.ByteString; +import java.util.function.BiFunction; +import org.apache.iotdb.pipe.api.type.Type; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarByte; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarDouble; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarFloat; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarInt; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarString; + +public enum MerlotIoTDBMapping { + INT32(Type.INT32, (val, meta) + -> ScalarInt.newBuilder() + .setSecondsintoyear(meta.seconds) + .setNano(meta.nano) + .setVal((Integer) val) + .setSeverity(meta.severity) + .setStatus(meta.status) + .build()), + + INT64(Type.INT64, (val, meta) -> { + long numericValue = ((Number) val).longValue(); + + return org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarDouble.newBuilder() + .setSecondsintoyear(meta.seconds) + .setNano(meta.nano) + .setVal((double) numericValue) + .setSeverity(meta.severity) + .setStatus(meta.status) + .build(); + }), + + FLOAT(Type.FLOAT, (val, meta) + -> ScalarFloat.newBuilder() + .setSecondsintoyear(meta.seconds) + .setNano(meta.nano) + .setVal((Float) val) + .setSeverity(meta.severity) + .setStatus(meta.status) + .build()), + DOUBLE(Type.DOUBLE, (val, meta) + -> ScalarDouble.newBuilder() + .setSecondsintoyear(meta.seconds) + .setNano(meta.nano) + .setVal((Double) val) + .setSeverity(meta.severity) + .setStatus(meta.status) + .build()), + TEXT(Type.TEXT, (val, meta) + -> ScalarString.newBuilder() + .setSecondsintoyear(meta.seconds) + .setNano(meta.nano) + .setVal(val.toString()) + .setSeverity(meta.severity) + .setStatus(meta.status) + .build()), + BOOLEAN(Type.BOOLEAN, (val, meta) + -> ScalarByte.newBuilder() + .setSecondsintoyear(meta.seconds) + .setNano(meta.nano) + .setVal(ByteString.copyFrom(new byte[]{(byte) ((boolean) val ? 1 : 0)})) + .setSeverity(meta.severity) + .setStatus(meta.status) + .build()); + + + private final Type iotdbType; + private final BiFunction converter; + + MerlotIoTDBMapping(Type iotdbType, BiFunction converter) { + this.iotdbType = iotdbType; + this.converter = converter; + } + + public Object convert(Object value, EpicsMetadata meta) { + return this.converter.apply(value, meta); + } + + public static MerlotIoTDBMapping fromIotdb(Type type) { + for (MerlotIoTDBMapping c : values()) { + if (c.iotdbType == type) { + return c; + } + } + throw new UnsupportedOperationException("IoTDB type not supported: " + type); + } + + /** + * Container for additional fields defined in EPICSEvent.proto + */ + public static class EpicsMetadata { + + public final int seconds; + public final int nano; + public final int severity; + public final int status; + + public EpicsMetadata(int seconds, int nano, int severity, int status) { + this.seconds = seconds; + this.nano = nano; + this.severity = severity; + this.status = status; + } + } +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotPBRawSerializer.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotPBRawSerializer.java index 51af9f84..8e34d0c3 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotPBRawSerializer.java +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/core/MerlotPBRawSerializer.java @@ -16,38 +16,100 @@ */ package org.apache.plc4x.merlot.archiver.core; +import com.google.protobuf.ByteString; import org.apache.plc4x.merlot.api.PB.EPICSEvent; import org.epics.vtype.VType; import java.io.IOException; import java.io.OutputStream; -import java.time.Instant; -import java.time.ZoneId; +import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.time.ZonedDateTime; import java.util.List; import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarDouble; +import org.epics.vtype.AlarmProvider; +import org.epics.vtype.Time; +import org.epics.vtype.TimeProvider; +import org.epics.vtype.VByte; import org.epics.vtype.VDouble; - +import org.epics.vtype.VFloat; +import org.epics.vtype.VInt; +import org.epics.vtype.VString; +import org.slf4j.LoggerFactory; /** - * Clase utilitaria para la generación de archivos en formato PBRAW compatibles - * con el EPICS Archiver Appliance. + * Utility class for generating PBRAW-format files compatible with the EPICS + * Archiver Appliance. */ public final class MerlotPBRawSerializer { - // Constructor privado para evitar instanciación de clase utilitaria - private MerlotPBRawSerializer() {} + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotPBRawSerializer.class); + + private MerlotPBRawSerializer() { + } + + public static void serializeIoTDBToPBRaw(List events, String pvName, OutputStream out) throws IOException { + if (events == null || events.isEmpty()) { + throw new IllegalArgumentException("The list of events is empty or null"); + } + + VType primerElemento = events.get(0); + EPICSEvent.PayloadType type; + int year; + + if (primerElemento instanceof VDouble) { + type = EPICSEvent.PayloadType.SCALAR_DOUBLE; + year = ((VDouble) primerElemento).getTime().getTimestamp().atOffset(ZoneOffset.UTC).getYear(); + } else if (primerElemento instanceof VInt) { + type = EPICSEvent.PayloadType.SCALAR_INT; + year = ((VInt) primerElemento).getTime().getTimestamp().atOffset(ZoneOffset.UTC).getYear(); + } else if (primerElemento instanceof VFloat) { + type = EPICSEvent.PayloadType.SCALAR_FLOAT; + year = ((VFloat) primerElemento).getTime().getTimestamp().atOffset(ZoneOffset.UTC).getYear(); + } else if (primerElemento instanceof VString) { + type = EPICSEvent.PayloadType.SCALAR_STRING; + year = ((VString) primerElemento).getTime().getTimestamp().atOffset(ZoneOffset.UTC).getYear(); + } else if (primerElemento instanceof VByte) { + type = EPICSEvent.PayloadType.SCALAR_BYTE; + year = ((VByte) primerElemento).getTime().getTimestamp().atOffset(ZoneOffset.UTC).getYear(); + } else { + throw new UnsupportedOperationException("Class not supported for mapping: " + primerElemento.getClass().getName()); + } + + // Response to Phoebus + EPICSEvent.PayloadInfo info = EPICSEvent.PayloadInfo.newBuilder() + .setPvname(pvName) + .setType(type) + .setYear(year) + .setElementCount(1) + .build(); + out.write(info.toByteArray()); + out.write('\n'); + + for (int i = 0; i < events.size(); i++) { + byte[] data = serializeIoTDBToBytes(events.get(i)); + + //Samples converted to PB + if (data != null) { + + out.write(data); + out.write('\n'); + } else { + LOGGER.warn("The event {} could not be serialized (null)", i); + } + } + + out.flush(); + } /** - * Serializa una lista de eventos VType a un archivo en formato .pbraw. + * Serializa una lista de eventos VType a un archivo en formato .pbraw. * - * @param events Lista de eventos provenientes de la red de control. - * @param pvName Nombre de la Variable de Proceso (PV). + * @param events Lista de eventos provenientes de la red de control. + * @param pvName Nombre de la Variable de Proceso (PV). * @param fileName Nombre del archivo de salida. * @throws IOException Si ocurre un error durante la escritura. */ public static void serializeToPBRaw(List events, String pvName, OutputStream out) throws IOException { - + if (events == null || events.isEmpty()) { return; } @@ -55,13 +117,14 @@ public static void serializeToPBRaw(List events, String pvName, OutputStr // 1. Escribir el PayloadInfo (Metadatos obligatorios para pbrawclient) // Se asume el tipo basado en el primer elemento de la lista MerlotPayloadMapping mapping = MerlotPayloadMapping.fromVType(events.get(0)); + if (mapping == null) { throw new IOException("Tipo de VType no soportado para serialización."); } EPICSEvent.PayloadInfo info = EPICSEvent.PayloadInfo.newBuilder() .setPvname(pvName) - .setType(EPICSEvent.PayloadType.SCALAR_DOUBLE) + .setType(EPICSEvent.PayloadType.SCALAR_DOUBLE) .setYear(((VDouble) events.get(0)) .getTime() .getTimestamp() @@ -71,8 +134,8 @@ public static void serializeToPBRaw(List events, String pvName, OutputStr // info.writeDelimitedTo(out); out.write(info.toByteArray()); - out.write('\n'); - + out.write('\n'); + // // // Tiempo actual // Instant now = Instant.now(); @@ -85,8 +148,6 @@ public static void serializeToPBRaw(List events, String pvName, OutputStr // + zdt.getHour() * 3600 // + zdt.getMinute() * 60 // + zdt.getSecond(); - - // 2. Serializar cada evento VType usando la factoría MerlotPayloadMapping for (VType vType : events) { Object pbEvent = MerlotPayloadMapping.createEvent(vType); @@ -94,8 +155,8 @@ public static void serializeToPBRaw(List events, String pvName, OutputStr if (pbEvent != null) { // writeEventToStream(out, pbEvent); out.write(((ScalarDouble) pbEvent).toByteArray()); - - out.write('\n'); + + out.write('\n'); } } // @@ -108,13 +169,21 @@ public static void serializeToPBRaw(List events, String pvName, OutputStr // .build(); // // out.write(event.toByteArray()); - - + out.flush(); } /** - * Escribe el objeto de Protocol Buffers en el stream usando formato delimitado. + * Serializa una lista de eventos Type a un archivo en formato .pbraw. + * + * @param events Lista de eventos provenientes de IoTDB Database. + * @param pvName Nombre de la Variable de Proceso (PV). + * @param fileName Nombre del archivo de salida. + * @throws IOException Si ocurre un error durante la escritura. + */ + /** + * Escribe el objeto de Protocol Buffers en el stream usando formato + * delimitado. */ private static void writeEventToStream(OutputStream out, Object pbEvent) throws IOException { if (pbEvent instanceof com.google.protobuf.MessageLite) { @@ -125,13 +194,14 @@ private static void writeEventToStream(OutputStream out, Object pbEvent) throws if (bytes != null) { // writeVarint32(out, bytes.length); out.write(bytes); - out.write(0x0A); + out.write(0x0A); } } } /** - * Utilidad para escribir el prefijo de tamaño (Varint32) requerido por el protocolo. + * Utilidad para escribir el prefijo de tamaño (Varint32) requerido por el + * protocolo. */ private static void writeVarint32(OutputStream out, int value) throws IOException { while (true) { @@ -146,15 +216,75 @@ private static void writeVarint32(OutputStream out, int value) throws IOExceptio } /** - * Convierte el objeto del evento en su representación de bytes. + * Convierte el objeto del evento en su representación de bytes. */ private static byte[] serializeToBytes(Object pbEvent) { if (pbEvent instanceof EPICSEvent.ScalarDouble) { return ((EPICSEvent.ScalarDouble) pbEvent).toByteArray(); } else if (pbEvent instanceof EPICSEvent.ScalarInt) { return ((EPICSEvent.ScalarInt) pbEvent).toByteArray(); + } else if (pbEvent instanceof EPICSEvent.ScalarFloat) { + return ((EPICSEvent.ScalarFloat) pbEvent).toByteArray(); + } else if (pbEvent instanceof EPICSEvent.ScalarByte) { + return ((EPICSEvent.ScalarByte) pbEvent).toByteArray(); } // Añadir otros tipos según sea necesario return null; } + + private static byte[] serializeIoTDBToBytes(VType event) { + if (event == null) { + return null; + } + + Time eventTime = ((TimeProvider) event).getTime(); + OffsetDateTime time = eventTime.getTimestamp().atOffset(ZoneOffset.UTC); + int secondsIntoYear = (int) (time.toEpochSecond() + - time.withDayOfYear(1).withHour(0).withMinute(0).withSecond(0).toEpochSecond()); + int nanos = eventTime.getTimestamp().getNano(); + int severity = ((AlarmProvider) event).getAlarm().getSeverity().ordinal(); + + if (event instanceof VDouble) { + return EPICSEvent.ScalarDouble.newBuilder() + .setSecondsintoyear(secondsIntoYear) + .setNano(nanos) + .setVal(((VDouble) event).getValue()) + .setSeverity(severity) + .build().toByteArray(); + + } else if (event instanceof VInt) { + return EPICSEvent.ScalarInt.newBuilder() + .setSecondsintoyear(secondsIntoYear) + .setNano(nanos) + .setVal(((VInt) event).getValue()) + .setSeverity(severity) + .build().toByteArray(); + + } else if (event instanceof VFloat) { + return EPICSEvent.ScalarFloat.newBuilder() + .setSecondsintoyear(secondsIntoYear) + .setNano(nanos) + .setVal(((VFloat) event).getValue()) + .setSeverity(severity) + .build().toByteArray(); + + } else if (event instanceof VString) { + return EPICSEvent.ScalarString.newBuilder() + .setSecondsintoyear(secondsIntoYear) + .setNano(nanos) + .setVal(((VString) event).getValue()) + .setSeverity(severity) + .build().toByteArray(); + + } else if (event instanceof VByte) { + return EPICSEvent.ScalarByte.newBuilder() + .setSecondsintoyear(secondsIntoYear) + .setNano(nanos) + .setVal(ByteString.copyFrom(new byte[]{((VByte) event).getValue()})) + .setSeverity(severity) + .build().toByteArray(); + } + //If the type is not supported + return null; + } } diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBGetDataPVImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBGetDataPVImpl.java index 3535f9fd..8d8760e9 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBGetDataPVImpl.java +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBGetDataPVImpl.java @@ -16,8 +16,93 @@ */ package org.apache.plc4x.merlot.archiver.impl; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.plc4x.merlot.archiver.api.MerlotHtc; +import org.apache.plc4x.merlot.archiver.core.MerlotPBRawSerializer; +import org.epics.vtype.VType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MerlotDataBrowserIoTDBGetDataPVImpl extends HttpServlet { - + + private static final Logger LOGGER = LoggerFactory.getLogger(MerlotDataBrowserRTGetDataPVImpl.class); + + private final Pattern opti_pattern = Pattern.compile("optimized_11520\\(([^)]+)\\)"); + private final Pattern ncount_pattern = Pattern.compile("ncount\\(([^)]+)\\)"); + private final Pattern count_pattern = Pattern.compile("count_3600\\(([^)]+)\\)"); + + private Matcher opti_matcher = null; + private Matcher ncount_matcher = null; + private Matcher count_matcher = null; + + private final MerlotHtc mhtc; + + public MerlotDataBrowserIoTDBGetDataPVImpl(MerlotHtc mhtc) { + this.mhtc = mhtc; + } + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + + + String from = req.getParameter("from"); + String to = req.getParameter("to"); + String[] pvs = req.getParameterValues("pv"); + + + LOGGER.info("Inicio Servlet."); + if ((null == from) || (null == to)) { + return; + } + if ((null == pvs) || (pvs.length == 0)) { + return; + } + + resp.setContentType("application/octet-stream"); + for (String pv : pvs) { + opti_matcher = opti_pattern.matcher(pv); + ncount_matcher = ncount_pattern.matcher(pv); + count_matcher = count_pattern.matcher(pv); + + if (opti_matcher.matches()) { + LOGGER.info("optimized_11520(pv) not supported."); + } else if (ncount_matcher.matches()) { + String strpv = ncount_matcher.group(1); + int countpv = mhtc.countPVs(strpv, from, to); + LOGGER.info("Number of events: " + countpv); + resp.getWriter().print(countpv); + resp.getWriter().close(); + } else if (count_matcher.matches()) { + LOGGER.info("count_3600(pv) not supported."); + } else { + createRawResponse(pv, from, to, resp.getOutputStream()); + resp.getOutputStream().close(); + } + } + + } + + private void createRawResponse(String pv, String from, String to, OutputStream out) throws IOException { + + List values = mhtc.getPVs(pv, from, to); + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + MerlotPBRawSerializer.serializeIoTDBToPBRaw(values, pv, bout); + ByteBuf buf = Unpooled.wrappedBuffer(bout.toByteArray()); + System.out.println(ByteBufUtil.prettyHexDump(buf)); + out.write(bout.toByteArray()); + } + } diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBSearchPVImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBSearchPVImpl.java index 47e26b5b..da19fb7b 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBSearchPVImpl.java +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotDataBrowserIoTDBSearchPVImpl.java @@ -22,23 +22,23 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; - +import org.apache.plc4x.merlot.archiver.api.MerlotHtc; public class MerlotDataBrowserIoTDBSearchPVImpl extends HttpServlet { + private final MerlotHtc mhtc; + + public MerlotDataBrowserIoTDBSearchPVImpl(MerlotHtc mhtc) { + this.mhtc = mhtc; + } + @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { resp.setContentType("text/plain"); try (PrintWriter writer = resp.getWriter()) { - writer.println("uno"); - writer.println("dos"); - writer.println("tres"); - writer.println("cuatro"); - writer.println("cinco"); - writer.println("seis"); - writer.println("siete"); - writer.println("ocho"); - } + mhtc.getPVs().forEach(s -> writer.println(s)); + } + } - + } diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcIoTDBImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcIoTDBImpl.java index 18a54faa..d59b1448 100755 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcIoTDBImpl.java +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcIoTDBImpl.java @@ -16,11 +16,31 @@ */ package org.apache.plc4x.merlot.archiver.impl; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.iotdb.isession.pool.SessionDataSetWrapper; +import org.apache.iotdb.pipe.api.type.Type; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.pool.SessionPool; -import org.apache.plc4x.merlot.archiver.api.MerlotGPClient; import org.apache.plc4x.merlot.archiver.api.MerlotHtc; +import org.apache.plc4x.merlot.archiver.core.MerlotIoTDBMapping; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.epics.vtype.Alarm; +import org.epics.vtype.Display; +import org.epics.vtype.Time; +import org.epics.vtype.VByte; +import org.epics.vtype.VDouble; +import org.epics.vtype.VFloat; +import org.epics.vtype.VInt; +import org.epics.vtype.VString; import org.epics.vtype.VType; import org.slf4j.LoggerFactory; @@ -29,71 +49,193 @@ * @author cgarcia */ public class MerlotHtcIoTDBImpl implements MerlotHtc { - private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotHtcIoTDBImpl.class); - - + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotHtcIoTDBImpl.class); + private static final String strID = "iotdb"; - private SessionPool sp ; - + private SessionPool sp; + + public MerlotHtcIoTDBImpl() { - public MerlotHtcIoTDBImpl(MerlotGPClient gpMerlotClient) { - } - + @Override public void init() { - sp = new SessionPool.Builder() - .host("192.168.0.218") - .port(6667) - .user("root") - .password("root") - .maxSize(5) - .build(); - - if (sp != null) { - LOGGER.info("Connection sucess"); + + } + + public SessionPool getIoTDBConnection() throws IoTDBConnectionException, StatementExecutionException { + try { + //TODO: Leer datos desde archivo cfg + sp = new SessionPool.Builder() + .nodeUrls(Arrays.asList("192.168.31.202:6667")) + .user("root") + .password("root") + .maxSize(10) + .enableAutoFetch(false) + .build(); + + if (sp != null) { + return sp; + } + } catch (Exception e) { + System.out.println("Error: " + e.getMessage()); } + return null; } @Override public void destroy() { if (sp != null) { - sp.close(); + sp.close(); } } @Override public String getID() { return strID; - } - + } + @Override public void addPV(String strPV, Double interval) { // } @Override - public void removePV(String strPV) { + public void removePV(String strPV + ) { // } @Override public Set getPVs() { - - return null; + Set pvs = new HashSet<>(); + + String pvNameQuery = "SHOW TIMESERIES root.**"; + + try (SessionDataSetWrapper ds = getIoTDBConnection().executeQueryStatement(pvNameQuery)) { + while (ds.hasNext()) { + + String fullPath = ds.next().getFields().get(0).getStringValue(); + + pvs.add(fullPath.replaceFirst("^[^.]+\\.(.*)", "$1")); + } + } catch (Exception ex) { + LOGGER.error("Error retrieving PVs from IoTDB: {}", ex.getMessage()); + } + + return pvs; } @Override public List getPVs(String strPV, String init, String end) { - return null; + List listResult = new ArrayList<>(); + + try { + String device = getBasePath(strPV); + String measurement = getTimeserieNameSimple(strPV); + + long startT = Instant.parse(init).toEpochMilli(); + long endT = Instant.parse(end).toEpochMilli(); + + String sql = String.format("SELECT %s FROM root.%s WHERE time >= %d AND time <= %d", + measurement, device, startT, endT); + try (SessionDataSetWrapper dataSet = getIoTDBConnection().executeQueryStatement(sql)) { + + String typeStr = dataSet.getColumnTypes().get(1); + Type iotdbType = Type.valueOf(typeStr); + MerlotIoTDBMapping mapper = MerlotIoTDBMapping.fromIotdb(iotdbType); + + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + long timestamp = record.getTimestamp(); + Field field = record.getFields().get(0); + + if (field.getDataType() != null) { + Instant inst = Instant.ofEpochMilli(timestamp); + + MerlotIoTDBMapping.EpicsMetadata meta + = new MerlotIoTDBMapping.EpicsMetadata((int) inst.getEpochSecond(), inst.getNano(), 0, 0); + + Object epicsEvent = mapper.convert(field.getObjectValue(field.getDataType()), meta); + listResult.addAll(translateToScalarType(inst, epicsEvent)); + } + } + } + } catch (Exception e) { + LOGGER.error("Error retrieving PVs from IoTDB", e); + } + + for (VType vType : listResult) { + LOGGER.info("EpicsEvent: {}", vType.toString()); + } + return listResult; + } + + private static List translateToScalarType(Instant inst, Object epicsEvent) { + List listEvents = new ArrayList<>(); + + // --- CONVERSION BASED STRICTLY ON MerlotIoTDBMapping --- + if (epicsEvent instanceof VType) { + listEvents.add((VType) epicsEvent); + } else if (epicsEvent instanceof org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarDouble) { + var pbEvent = (org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarDouble) epicsEvent; + listEvents.add(VDouble.of(pbEvent.getVal(), Alarm.none(), Time.of(inst), Display.none())); + } else if (epicsEvent instanceof org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarInt) { + var pbEvent = (org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarInt) epicsEvent; + listEvents.add(VInt.of(pbEvent.getVal(), Alarm.none(), Time.of(inst), Display.none())); + } else if (epicsEvent instanceof org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarFloat) { + var pb = (org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarFloat) epicsEvent; + listEvents.add(VFloat.of(pb.getVal(), Alarm.none(), Time.of(inst), Display.none())); + } else if (epicsEvent instanceof org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarString) { + var pb = (org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarString) epicsEvent; + listEvents.add(VString.of(pb.getVal(), Alarm.none(), Time.of(inst))); + } else if (epicsEvent instanceof org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarByte) { + var pb = (org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarByte) epicsEvent; + + byte byteVal = pb.getVal().isEmpty() ? 0 : pb.getVal().byteAt(0); + + listEvents.add(VByte.of(byteVal, Alarm.none(), Time.of(inst), Display.none())); + } else { + LOGGER.warn("PB event type not supported in the final conversion: {}", epicsEvent.getClass().getName()); + } + return listEvents; } @Override - public int countPVs(String strPV, String init, String end) { + public int countPVs(String strPV, String init, + String end + ) { + return 0; } - - - - + + /* + Returns the variable stored in the IoTDB device (pvName) + */ + private static String getTimeserieNameSimple(String pvName) { + Pattern pattern = Pattern.compile("[^.]+$"); + Matcher matcher = pattern.matcher(pvName); + + if (matcher.find()) { + return matcher.group(); + } + + return ""; + } + + /* + Returns the base path contained in the IoTDB device + */ + public static String getBasePath(String pvName) { + if (pvName == null || pvName.isEmpty()) { + return ""; + } + + // Explicación del Regex: + // ^(?:root\.)? -> Grupo opcional al inicio: busca "root." pero no lo captura. + // (.*) -> Grupo 1: Captura codiciosamente todo el camino intermedio. + // \.[^.]+$ -> Busca el último punto y lo que sigue hasta el final (la medida). + return pvName.replaceFirst("^(?:root\\.)?(.*)\\.[^.]+$", "$1"); + } } diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcRTImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcRTImpl.java index 017077c9..d0f728b9 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcRTImpl.java +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/java/org/apache/plc4x/merlot/archiver/impl/MerlotHtcRTImpl.java @@ -143,7 +143,6 @@ public List getPVs(String strPV, String init, String end) { }) .collect(Collectors.toList()); if (listPVs.size() == 0) { - System.out.println("Tamaño es cero."); VType lastvalue = queue.get(0); VDouble valorOriginal = (VDouble) lastvalue; Time tv = Time.of(Instant.parse(init)); diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/resources/OSGI-INF/blueprint/archiver-service.xml b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/resources/OSGI-INF/blueprint/archiver-service.xml index 3b216f99..23a1d37b 100644 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/resources/OSGI-INF/blueprint/archiver-service.xml +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/main/resources/OSGI-INF/blueprint/archiver-service.xml @@ -103,8 +103,7 @@ init-method="init" destroy-method="destroy" scope="singleton" - activation="eager"> - + activation="eager"> + - + + + + + + + + + + + + @@ -228,5 +233,14 @@ + + + + + + + + + \ No newline at end of file diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/test/java/org/apache/plc4x/merlot/archiver/test/MerlotIoTDBMappingTest.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/test/java/org/apache/plc4x/merlot/archiver/test/MerlotIoTDBMappingTest.java new file mode 100644 index 00000000..1b9f6860 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archiver/src/test/java/org/apache/plc4x/merlot/archiver/test/MerlotIoTDBMappingTest.java @@ -0,0 +1,49 @@ +package org.apache.plc4x.merlot.archiver.test; + +import org.apache.iotdb.pipe.api.type.Type; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarDouble; +import org.apache.plc4x.merlot.api.PB.EPICSEvent.ScalarFloat; +import org.apache.plc4x.merlot.archiver.core.MerlotIoTDBMapping; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; // JUnit 5 +import org.junit.jupiter.api.Test; + +public class MerlotIoTDBMappingTest { + private MerlotIoTDBMapping.EpicsMetadata testMeta; + + @BeforeEach + public void setUp() { + // Inicialización correcta antes de cada test + testMeta = new MerlotIoTDBMapping.EpicsMetadata(12345678, 999, 1, 2); + } + + @Test + public void testConvertFloat() { + float value = 12.34f; + Object result = MerlotIoTDBMapping.fromIotdb(Type.FLOAT).convert(value, testMeta); + + Assertions.assertTrue(result instanceof ScalarFloat); + ScalarFloat scalar = (ScalarFloat) result; + Assertions.assertEquals(value, scalar.getVal(), 0.001); + Assertions.assertEquals(testMeta.nano, scalar.getNano()); + } + + @Test + public void testConvertDouble() { + double value = 99.999; + Object result = MerlotIoTDBMapping.fromIotdb(Type.DOUBLE).convert(value, testMeta); + + Assertions.assertTrue(result instanceof ScalarDouble); + ScalarDouble scalar = (ScalarDouble) result; + Assertions.assertEquals(value, scalar.getVal(), 0.00001); + Assertions.assertEquals(testMeta.status, scalar.getStatus()); + } + + @Test + public void testUnsupportedType() { + // Verificamos que lance la excepción esperada para tipos como BLOB + Assertions.assertThrows(UnsupportedOperationException.class, () -> { + MerlotIoTDBMapping.fromIotdb(Type.BLOB); + }); + } +} \ No newline at end of file diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.features/src/main/feature/feature.xml b/plc4j/tools/merlot/org.apache.plc4x.merlot.features/src/main/feature/feature.xml index e2e1566b..120b6d09 100755 --- a/plc4j/tools/merlot/org.apache.plc4x.merlot.features/src/main/feature/feature.xml +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.features/src/main/feature/feature.xml @@ -102,7 +102,7 @@ plc4x-core - + plc4x-archiver @@ -728,7 +728,9 @@ dataSourceName=rtdb wrap:mvn:org.apache.iotdb/iotdb-thrift-commons/${iotdb.session.version}/$Bundle-SymbolicName=iotdb-thrift-commons&Bundle-Version=${iotdb.session.version}&Bundle-ManifestVersion="2"&Export-Package=*;version="${iotdb.session.version}",!* mvn:org.apache.iotdb/iotdb-thrift/${iotdb.session.version} - wrap:mvn:org.apache.tsfile/common/${tsfile.version}/$Bundle-SymbolicName=common&Bundle-Version=${tsfile.version}&Bundle-ManifestVersion="2"&Export-Package=*;version="${tsfile.version}",!* + + + wrap:mvn:org.apache.tsfile/common/${tsfile.version}$Bundle-SymbolicName=common&Bundle-Version=${tsfile.version}&Export-Package=*;-org.apache.tsfile.utils mvn:org.apache.tsfile/tsfile/${tsfile.version} mvn:net.jpountz.lz4/lz4/1.3.0