diff --git a/experiments/input/test.csv b/experiments/input/test.csv new file mode 100644 index 00000000..0bcd92e8 --- /dev/null +++ b/experiments/input/test.csv @@ -0,0 +1,18 @@ +,trace_id,event_type,timestamp +0,0,a,2020-08-15 12:56:42 +1,0,b,2020-08-16 12:56:42 +2,0,a,2020-08-19 12:56:42 +3,0,b,2020-08-20 14:21:02 +4,1,b,2020-08-15 12:11:54 +5,1,c,2020-08-16 12:11:54 +6,2,c,2020-08-15 12:31:04 +7,2,b,2020-08-16 12:31:04 +8,2,a,2020-08-18 12:31:04 +9,0,a,2020-09-03 12:56:42 +10,0,b,2020-09-05 12:56:42 +11,1,a,2020-09-05 12:11:54 +12,2,c,2020-09-07 12:31:04 +13,2,b,2020-09-08 12:31:04 +14,3,c,2020-09-07 12:31:04 +15,3,d,2020-09-08 12:31:04 +16,3,a,2020-09-09 12:31:04 diff --git a/src/main/scala/auth/datalab/siesta/BusinessLogic/IngestData/ReadLogFile.scala b/src/main/scala/auth/datalab/siesta/BusinessLogic/IngestData/ReadLogFile.scala index e70dba31..7e03e52e 100644 --- a/src/main/scala/auth/datalab/siesta/BusinessLogic/IngestData/ReadLogFile.scala +++ b/src/main/scala/auth/datalab/siesta/BusinessLogic/IngestData/ReadLogFile.scala @@ -7,6 +7,7 @@ import org.deckfour.xes.in.XParserRegistry import org.deckfour.xes.model.{XLog, XTrace} import java.io.{File, FileInputStream} +import java.sql.Timestamp import java.text.SimpleDateFormat import java.util.Scanner import scala.collection.convert.ImplicitConversions.`list asScalaBuffer` @@ -43,7 +44,9 @@ object ReadLogFile { this.readFromXes(fileName) } else if (fileName.split('.')(1) == "withTimestamp") { this.readWithTimestamps(fileName, ",", "/delab/") - } else { + } else if (fileName.split('.')(1) == "csv") { + this.readFromCSV(fileName) + }else { throw new Exception("Not recognised file type") } } @@ -191,4 +194,37 @@ object ReadLogFile { par } + def readFromCSV(fileName: String): RDD[Sequence] = { + val spark = SparkSession.builder().getOrCreate() + // Read CSV file into DataFrame + val df = spark.read.option("header", "true") + .csv(fileName) + .select("trace_id", "event_type", "timestamp") + + // Convert DataFrame to RDD and group by trace_id + val groupedRDD = df.rdd.map(row => + (row.getString(0), new Event(timestamp = row.getString(2), event_type = row.getString(1), trace_id = row.getString(0), + position = 0)) + ).combineByKey( + (event: Event) => List(event), // Create a list with the first event + (acc: List[Event], event: Event) => event :: acc, // Add event to the list + (acc1: List[Event], acc2: List[Event]) => acc1 ++ acc2 // Merge lists from different partitions + ) + + // Sort by timestamp and assign positions + import spark.implicits._ + val sequencesRDD = groupedRDD.map { case (traceId, events) => + val sortedEvents = events.map(x=>(x,Timestamp.valueOf(x.timestamp).getTime)).toList + .sortBy(_._2).zipWithIndex.map {case (event,index)=> + new Event(timestamp = event._1.timestamp,event_type = event._1.event_type, trace_id = event._1.trace_id, + position = index + ) + } + new Sequence(sortedEvents, traceId) + } + + sequencesRDD + } + + }