From 64569636a532b22566443e005a262ff869700212 Mon Sep 17 00:00:00 2001 From: Nico Montanari Date: Tue, 21 Apr 2026 11:29:43 +0200 Subject: [PATCH] feat: add river levels monitoring and management features --- README.md | 27 +++++ sql/rivers.sql | 25 +++++ src/models/river-level.ts | 28 +++++ src/models/river.ts | 54 +++++++++ src/routes/rivers.ts | 148 +++++++++++++++++++++++++ src/scheduler.ts | 2 + src/server.ts | 2 + src/services/river-sensors.ts | 40 +++++++ src/tasks/river-levels.ts | 73 +++++++++++++ src/utilities/river-sensors.ts | 69 ++++++++++++ src/utilities/telegram.ts | 25 +++++ tests/services/river-sensors.ts | 114 +++++++++++++++++++ tests/utilities/river-sensors.ts | 181 +++++++++++++++++++++++++++++++ 13 files changed, 788 insertions(+) create mode 100644 sql/rivers.sql create mode 100644 src/models/river-level.ts create mode 100644 src/models/river.ts create mode 100644 src/routes/rivers.ts create mode 100644 src/services/river-sensors.ts create mode 100644 src/tasks/river-levels.ts create mode 100644 src/utilities/river-sensors.ts create mode 100644 tests/services/river-sensors.ts create mode 100644 tests/utilities/river-sensors.ts diff --git a/README.md b/README.md index a70f753..36421c6 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,11 @@ The system interfaces with various official and unofficial sources to provide a - **Details:** Dynamically retrieves forecast maps for the following day via the Pretemp archive. - **Notification:** If conditions require it (presence of ongoing alerts), it sends the thunderstorm forecast image to the Telegram channel. +4. **River levels (Allerta Meteo hydrometric stations)** + - **Feature:** Monitoring of water levels at configured hydrometric stations. + - **Details:** Periodically polls the Allerta Meteo time-series endpoint for each registered station and compares the latest reading against operator-defined thresholds (`soglia1`, `soglia2`, `soglia3`). Station metadata and thresholds live in the `rivers` table; each check is appended to `river_levels`. + - **Notification:** Sends a Telegram message only on crossing events — when a reading rises above or falls below a threshold relative to the previous check. + ## Scheduled Tasks (Crons) The application relies on scheduled tasks (crons) to automate the weather monitoring flow: @@ -31,6 +36,28 @@ The application relies on scheduled tasks (crons) to automate the weather monito - Verifies and sends the Pretemp map for the following day, provided there is an ongoing alert and the map hasn't been sent yet. - **Estofex Report Check** - Verifies and sends the Estofex map for the following day, following the same conditional logic based on ongoing alerts. +- **River Levels Check** + - Every 5 minutes, for each row in the `rivers` table, fetches the latest hydrometric reading and appends it to `river_levels`. Sends a Telegram message only when the reading crosses one of the configured thresholds since the previous check. + +## Database bootstrap + +Schema is applied manually (no migration tooling). The required tables are in [sql/rivers.sql](sql/rivers.sql): + +```bash +psql "$DATABASE_URL" -f sql/rivers.sql +``` + +## River stations CRUD + +Stations are managed via HTTP (port 3000): + +- `GET /rivers` — list registered stations +- `POST /rivers` — create; body `{ station_id, river_name, station_name, soglia1?, soglia2?, soglia3? }` +- `PATCH /rivers/:id` — update any of `river_name`, `station_name`, `soglia1`, `soglia2`, `soglia3` +- `DELETE /rivers/:id` — delete (cascades to `river_levels`) +- `POST /river-levels` — trigger an on-demand check (returns `{ checked, crossings, skipped }`) + +The `station_id` is the Allerta Meteo `idstazione`; threshold values must be looked up manually from the [Allerta Meteo portal](https://allertameteo.regione.emilia-romagna.it/) and stored alongside the station. ## Configuration and Installation diff --git a/sql/rivers.sql b/sql/rivers.sql new file mode 100644 index 0000000..2e8b0c4 --- /dev/null +++ b/sql/rivers.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS rivers ( + id SERIAL PRIMARY KEY, + station_id TEXT NOT NULL UNIQUE, + river_name TEXT NOT NULL, + station_name TEXT NOT NULL, + soglia1 NUMERIC, + soglia2 NUMERIC, + soglia3 NUMERIC, + created_on TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_on TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE TABLE IF NOT EXISTS river_levels ( + id SERIAL PRIMARY KEY, + river_id INTEGER NOT NULL REFERENCES rivers(id) ON DELETE CASCADE, + value NUMERIC NOT NULL, + measured_at TIMESTAMPTZ NOT NULL, + soglia1_above BOOLEAN, + soglia2_above BOOLEAN, + soglia3_above BOOLEAN, + created_on TIMESTAMPTZ NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS river_levels_river_id_created_on_idx + ON river_levels (river_id, created_on DESC); diff --git a/src/models/river-level.ts b/src/models/river-level.ts new file mode 100644 index 0000000..56db2ef --- /dev/null +++ b/src/models/river-level.ts @@ -0,0 +1,28 @@ +import { database } from '..' + +const tableName = 'river_levels' + +export interface RiverLevel { + id: number + river_id: number + value: number + measured_at: string + soglia1_above: boolean | null + soglia2_above: boolean | null + soglia3_above: boolean | null + created_on: string +} + +export type CreatableRiverLevel = Omit + +export const getLatestRiverLevel = async (riverId: number): Promise => { + const query = `SELECT * FROM ${tableName} WHERE river_id = $1 ORDER BY created_on DESC, id DESC LIMIT 1` + + const rows = await database.query(query, [riverId]) + + return rows[0] +} + +export const createRiverLevel = async (level: CreatableRiverLevel): Promise => { + return database.create(tableName, { ...level, created_on: new Date().toISOString() }) +} diff --git a/src/models/river.ts b/src/models/river.ts new file mode 100644 index 0000000..ee95d85 --- /dev/null +++ b/src/models/river.ts @@ -0,0 +1,54 @@ +import { database } from '..' + +const tableName = 'rivers' + +export interface River { + id: number + station_id: string + river_name: string + station_name: string + soglia1: number | null + soglia2: number | null + soglia3: number | null + created_on: string + updated_on: string +} + +export type CreatableRiver = Omit +export type RiverPatch = Partial> + +export const getRivers = async (): Promise => { + const query = `SELECT * FROM ${tableName} ORDER BY id ASC` + + return database.query(query) +} + +export const getRiverById = async (id: number): Promise => { + const query = `SELECT * FROM ${tableName} WHERE id = $1` + + const rows = await database.query(query, [id]) + + return rows[0] +} + +export const getRiverByStationId = async (stationId: string): Promise => { + const query = `SELECT * FROM ${tableName} WHERE station_id = $1` + + const rows = await database.query(query, [stationId]) + + return rows[0] +} + +export const createRiver = async (river: CreatableRiver): Promise => { + const now = new Date().toISOString() + + return database.create(tableName, { ...river, created_on: now, updated_on: now }) +} + +export const updateRiver = async (id: number, patch: RiverPatch): Promise => { + return database.edit(tableName, { ...patch, updated_on: new Date().toISOString() }, id) +} + +export const deleteRiver = async (id: number): Promise => { + await database.delete(tableName, id) +} diff --git a/src/routes/rivers.ts b/src/routes/rivers.ts new file mode 100644 index 0000000..2800b60 --- /dev/null +++ b/src/routes/rivers.ts @@ -0,0 +1,148 @@ +import { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify' +import { + createRiver, + deleteRiver, + getRiverById, + getRiverByStationId, + getRivers, + updateRiver, +} from '../models/river' +import { runRiverLevelCheck } from '../tasks/river-levels' + +const nullableNumber = { type: ['number', 'null'] } + +const createBodySchema = { + type: 'object', + required: ['station_id', 'river_name', 'station_name'], + additionalProperties: false, + properties: { + station_id: { type: 'string', minLength: 1, maxLength: 64 }, + river_name: { type: 'string', minLength: 1, maxLength: 128 }, + station_name: { type: 'string', minLength: 1, maxLength: 128 }, + soglia1: nullableNumber, + soglia2: nullableNumber, + soglia3: nullableNumber, + }, +} + +const patchBodySchema = { + type: 'object', + additionalProperties: false, + minProperties: 1, + properties: { + river_name: { type: 'string', minLength: 1, maxLength: 128 }, + station_name: { type: 'string', minLength: 1, maxLength: 128 }, + soglia1: nullableNumber, + soglia2: nullableNumber, + soglia3: nullableNumber, + }, +} + +const idParamSchema = { + type: 'object', + required: ['id'], + additionalProperties: false, + properties: { + id: { type: 'integer', minimum: 1 }, + }, +} + +interface IdParams { + id: number +} + +interface CreateRiverBody { + station_id: string + river_name: string + station_name: string + soglia1?: number | null + soglia2?: number | null + soglia3?: number | null +} + +interface PatchRiverBody { + river_name?: string + station_name?: string + soglia1?: number | null + soglia2?: number | null + soglia3?: number | null +} + +export const registerRiversRoutes = (fastify: FastifyInstance) => { + fastify.route({ + method: 'GET', + url: '/rivers', + handler: async (_request: FastifyRequest, reply: FastifyReply) => { + const rivers = await getRivers() + reply.status(200).send(rivers) + }, + }) + + fastify.route<{ Body: CreateRiverBody }>({ + method: 'POST', + url: '/rivers', + schema: { body: createBodySchema }, + handler: async (request, reply) => { + const existing = await getRiverByStationId(request.body.station_id) + + if (existing) { + return reply.status(409).send({ error: 'Station already registered' }) + } + + const created = await createRiver({ + station_id: request.body.station_id, + river_name: request.body.river_name, + station_name: request.body.station_name, + soglia1: request.body.soglia1 ?? null, + soglia2: request.body.soglia2 ?? null, + soglia3: request.body.soglia3 ?? null, + }) + + reply.status(201).send(created) + }, + }) + + fastify.route<{ Params: IdParams; Body: PatchRiverBody }>({ + method: 'PATCH', + url: '/rivers/:id', + schema: { params: idParamSchema, body: patchBodySchema }, + handler: async (request, reply) => { + const river = await getRiverById(request.params.id) + + if (!river) { + return reply.status(404).send({ error: 'River not found' }) + } + + const updated = await updateRiver(request.params.id, request.body) + reply.status(200).send(updated) + }, + }) + + fastify.route<{ Params: IdParams }>({ + method: 'DELETE', + url: '/rivers/:id', + schema: { params: idParamSchema }, + handler: async (request, reply) => { + const river = await getRiverById(request.params.id) + + if (!river) { + return reply.status(404).send({ error: 'River not found' }) + } + + await deleteRiver(request.params.id) + reply.status(204).send(undefined) + }, + }) + + fastify.route({ + method: 'POST', + url: '/river-levels', + schema: { + body: { type: 'object', additionalProperties: false, maxProperties: 0 }, + }, + handler: async (_request: FastifyRequest, reply: FastifyReply) => { + const summary = await runRiverLevelCheck() + reply.status(200).send(summary) + }, + }) +} diff --git a/src/scheduler.ts b/src/scheduler.ts index ca862e0..1787376 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -2,6 +2,7 @@ import { Cron } from 'croner' import { runMeteoAlertCheck } from './tasks/meteo-alerts' import { runPretempCheck } from './tasks/pretemp' import { runEstofexCheck } from './tasks/estofex' +import { runRiverLevelCheck } from './tasks/river-levels' import logger from './logger' const jobs: Cron[] = [] @@ -35,6 +36,7 @@ export const startScheduler = () => { schedule('meteo-alerts', '*/5 * * * *', runMeteoAlertCheck) schedule('pretemp', '*/5 * * * *', runPretempCheck) schedule('estofex', '*/5 * * * *', runEstofexCheck) + schedule('river-levels', '*/5 * * * *', runRiverLevelCheck) logger.info({ count: jobs.length }, 'Scheduler started') } diff --git a/src/server.ts b/src/server.ts index bc01335..c4e1894 100644 --- a/src/server.ts +++ b/src/server.ts @@ -4,6 +4,7 @@ import i18next from 'i18next' import italian from './resources/locales/it.json' import { registerTestMessageRoutes } from './routes/test-message' import { registerForecastReportsRoutes } from './routes/forecast-reports' +import { registerRiversRoutes } from './routes/rivers' import logger from './logger' const translations = { @@ -45,6 +46,7 @@ export const startServer = async (): Promise => { registerTestMessageRoutes(app) registerMeteoAlertsRoutes(app) registerForecastReportsRoutes(app) + registerRiversRoutes(app) await fastify.listen({ host: '127.0.0.1', diff --git a/src/services/river-sensors.ts b/src/services/river-sensors.ts new file mode 100644 index 0000000..369064e --- /dev/null +++ b/src/services/river-sensors.ts @@ -0,0 +1,40 @@ +import { http } from './http' +import logger from '../logger' + +export interface RiverSensorTimePoint { + t: number + v: number +} + +const VARIABILE = '254,0,0/1,-,-,-/B13215' +const TIME_SERIES_BASE_URL = 'https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-time-series/' + +export const getRiverSensorTimeSeries = async (stationId: string): Promise => { + const url = `${TIME_SERIES_BASE_URL}?stazione=${encodeURIComponent(stationId)}&variabile=${VARIABILE}` + + try { + const response = await http.get(url).then((res) => res.data) + + if (!Array.isArray(response)) { + return [] + } + + return response.filter( + (point): point is RiverSensorTimePoint => + point != null && typeof point.t === 'number' && typeof point.v === 'number' + ) + } catch (error) { + logger.error({ err: error, stationId }, 'Failed to retrieve river sensor time series') + throw error + } +} + +export const getLatestRiverSensorValue = async (stationId: string): Promise => { + const series = await getRiverSensorTimeSeries(stationId) + + if (series.length === 0) { + return undefined + } + + return series.reduce((latest, point) => (point.t > latest.t ? point : latest), series[0]) +} diff --git a/src/tasks/river-levels.ts b/src/tasks/river-levels.ts new file mode 100644 index 0000000..44db897 --- /dev/null +++ b/src/tasks/river-levels.ts @@ -0,0 +1,73 @@ +import { getRivers } from '../models/river' +import { createRiverLevel, getLatestRiverLevel } from '../models/river-level' +import { getLatestRiverSensorValue } from '../services/river-sensors' +import { detectThresholdCrossings, getActiveThresholds, ThresholdBooleans } from '../utilities/river-sensors' +import { sendRiverLevelCrossingMessage } from '../utilities/telegram' +import logger from '../logger' + +const log = logger.child({ task: 'river-levels' }) + +export interface RiverLevelCheckSummary { + checked: number + crossings: number + skipped: number +} + +export const runRiverLevelCheck = async (): Promise => { + const rivers = await getRivers() + const summary: RiverLevelCheckSummary = { checked: 0, crossings: 0, skipped: 0 } + + if (rivers.length === 0) { + return summary + } + + for (const river of rivers) { + const latest = await getLatestRiverSensorValue(river.station_id).catch((err) => { + log.error({ err, stationId: river.station_id, riverId: river.id }, 'Failed to fetch latest river sensor value') + return undefined + }) + + if (!latest) { + summary.skipped += 1 + continue + } + + const currentValue = Number(latest.v) + const thresholds = getActiveThresholds(river) + const previous = await getLatestRiverLevel(river.id) + const previousState: ThresholdBooleans | undefined = previous + ? { + soglia1_above: previous.soglia1_above, + soglia2_above: previous.soglia2_above, + soglia3_above: previous.soglia3_above, + } + : undefined + + const { crossings, nextState } = detectThresholdCrossings(currentValue, thresholds, previousState) + + await createRiverLevel({ + river_id: river.id, + value: currentValue, + measured_at: new Date(latest.t).toISOString(), + soglia1_above: nextState.soglia1_above, + soglia2_above: nextState.soglia2_above, + soglia3_above: nextState.soglia3_above, + }) + + for (const crossing of crossings) { + try { + await sendRiverLevelCrossingMessage(river, crossing, currentValue) + summary.crossings += 1 + } catch (err) { + log.error( + { err, riverId: river.id, stationId: river.station_id, threshold: crossing.threshold.key }, + 'Failed to send river level crossing Telegram message' + ) + } + } + + summary.checked += 1 + } + + return summary +} diff --git a/src/utilities/river-sensors.ts b/src/utilities/river-sensors.ts new file mode 100644 index 0000000..227f890 --- /dev/null +++ b/src/utilities/river-sensors.ts @@ -0,0 +1,69 @@ +import { River } from '../models/river' + +export type ThresholdKey = 'soglia1' | 'soglia2' | 'soglia3' + +export interface ActiveThreshold { + key: ThresholdKey + value: number +} + +export interface ThresholdCrossing { + threshold: ActiveThreshold + direction: 'above' | 'below' +} + +export interface ThresholdBooleans { + soglia1_above: boolean | null + soglia2_above: boolean | null + soglia3_above: boolean | null +} + +export interface CrossingDetectionResult { + crossings: ThresholdCrossing[] + nextState: ThresholdBooleans +} + +const THRESHOLD_KEYS: ThresholdKey[] = ['soglia1', 'soglia2', 'soglia3'] + +export const getActiveThresholds = (river: River): ActiveThreshold[] => { + return THRESHOLD_KEYS.reduce((acc, key) => { + const raw = river[key] + const value = raw == null ? null : Number(raw) + + if (value != null && Number.isFinite(value)) { + acc.push({ key, value }) + } + + return acc + }, []) +} + +export const detectThresholdCrossings = ( + currentValue: number, + thresholds: ActiveThreshold[], + previousState: ThresholdBooleans | undefined +): CrossingDetectionResult => { + const nextState: ThresholdBooleans = { + soglia1_above: null, + soglia2_above: null, + soglia3_above: null, + } + const crossings: ThresholdCrossing[] = [] + + for (const threshold of thresholds) { + const currentAbove = currentValue > threshold.value + const aboveKey = `${threshold.key}_above` as keyof ThresholdBooleans + nextState[aboveKey] = currentAbove + + const previousAbove = previousState?.[aboveKey] ?? null + + if (previousState && previousAbove !== null && previousAbove !== currentAbove) { + crossings.push({ + threshold, + direction: currentAbove ? 'above' : 'below', + }) + } + } + + return { crossings, nextState } +} diff --git a/src/utilities/telegram.ts b/src/utilities/telegram.ts index 61a01a9..2dcbd54 100644 --- a/src/utilities/telegram.ts +++ b/src/utilities/telegram.ts @@ -2,10 +2,18 @@ import { sendTelegramMessage } from '../services/telegram' import { InlineKeyboardButton } from 'telegraf/typings/core/types/typegram' import { translateKey } from './common' import { ParsedMeteoAlert } from './meteo-alerts' +import { ThresholdCrossing } from './river-sensors' +import { River } from '../models/river' import { config } from '../config/config' const separator = '--------------------------------' +const thresholdLabel: Record = { + soglia1: 'soglia 1', + soglia2: 'soglia 2', + soglia3: 'soglia 3', +} + export const sendNewTomorrowAlertMessage = async (alert: ParsedMeteoAlert) => { const criticDataMessage = Object.keys(alert.criticZoneData) .map((key) => { @@ -41,3 +49,20 @@ ${separator} }, }) } + +export const sendRiverLevelCrossingMessage = async ( + river: River, + crossing: ThresholdCrossing, + currentValue: number +) => { + const label = thresholdLabel[crossing.threshold.key] + const directionLine = + crossing.direction === 'above' ? `⬆️ Superata ${label}` : `⬇️ Rientrata sotto ${label}` + + const textMessage = `🌊 Livello del ${river.river_name} — ${river.station_name} +${directionLine} +Livello attuale: ${currentValue} m +Soglia: ${crossing.threshold.value} m` + + await sendTelegramMessage(config.chat_id, textMessage, { parse_mode: 'HTML' }) +} diff --git a/tests/services/river-sensors.ts b/tests/services/river-sensors.ts new file mode 100644 index 0000000..7e3b021 --- /dev/null +++ b/tests/services/river-sensors.ts @@ -0,0 +1,114 @@ +import { expect } from 'chai' +import { afterEach, beforeEach, describe, it } from 'mocha' +import sinon, { SinonStub } from 'sinon' +import { http } from '../../src/services/http' +import { getLatestRiverSensorValue, getRiverSensorTimeSeries } from '../../src/services/river-sensors' + +describe('tests/services/river-sensors', () => { + const expectedUrl = + 'https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-time-series/?stazione=3130&variabile=254,0,0/1,-,-,-/B13215' + + let axiosGetStub: SinonStub + + beforeEach(() => { + axiosGetStub = sinon.stub(http, 'get') + }) + + afterEach(() => { + axiosGetStub.restore() + }) + + describe('getRiverSensorTimeSeries', () => { + it('builds the correct URL and returns the full array', async () => { + const series = [ + { t: 1, v: 1.5 }, + { t: 2, v: 1.6 }, + ] + + axiosGetStub.resolves({ data: series }) + + const result = await getRiverSensorTimeSeries('3130') + + expect(result).to.deep.equal(series) + expect(axiosGetStub.calledOnceWithExactly(expectedUrl)).to.equal(true) + }) + + it('returns empty array when response is not an array', async () => { + axiosGetStub.resolves({ data: null }) + + const result = await getRiverSensorTimeSeries('3130') + + expect(result).to.deep.equal([]) + }) + + it('filters out malformed points', async () => { + axiosGetStub.resolves({ + data: [ + { t: 1, v: 1.5 }, + { t: 'bad', v: 1.6 }, + null, + { t: 3, v: 'bad' }, + { t: 4, v: 2.0 }, + ], + }) + + const result = await getRiverSensorTimeSeries('3130') + + expect(result).to.deep.equal([ + { t: 1, v: 1.5 }, + { t: 4, v: 2.0 }, + ]) + }) + + it('URL-encodes the station id', async () => { + axiosGetStub.resolves({ data: [] }) + + await getRiverSensorTimeSeries('abc/123') + + expect( + axiosGetStub.calledOnceWithExactly( + 'https://allertameteo.regione.emilia-romagna.it/o/api/allerta/get-time-series/?stazione=abc%2F123&variabile=254,0,0/1,-,-,-/B13215' + ) + ).to.equal(true) + }) + + it('rethrows and logs on HTTP failure', async () => { + axiosGetStub.rejects(new Error('boom')) + + let caught: unknown + + try { + await getRiverSensorTimeSeries('3130') + } catch (err) { + caught = err + } + + expect(caught).to.be.instanceOf(Error) + expect((caught as Error).message).to.equal('boom') + }) + }) + + describe('getLatestRiverSensorValue', () => { + it('returns undefined when the series is empty', async () => { + axiosGetStub.resolves({ data: [] }) + + const result = await getLatestRiverSensorValue('3130') + + expect(result).to.equal(undefined) + }) + + it('returns the point with the greatest timestamp', async () => { + axiosGetStub.resolves({ + data: [ + { t: 10, v: 1.1 }, + { t: 30, v: 1.3 }, + { t: 20, v: 1.2 }, + ], + }) + + const result = await getLatestRiverSensorValue('3130') + + expect(result).to.deep.equal({ t: 30, v: 1.3 }) + }) + }) +}) diff --git a/tests/utilities/river-sensors.ts b/tests/utilities/river-sensors.ts new file mode 100644 index 0000000..356a81f --- /dev/null +++ b/tests/utilities/river-sensors.ts @@ -0,0 +1,181 @@ +import { expect } from 'chai' +import { describe, it } from 'mocha' +import { River } from '../../src/models/river' +import { detectThresholdCrossings, getActiveThresholds, ThresholdBooleans } from '../../src/utilities/river-sensors' + +const makeRiver = (overrides: Partial = {}): River => ({ + id: 1, + station_id: '3130', + river_name: 'Reno', + station_name: 'Casalecchio Chiusa', + soglia1: 1.5, + soglia2: 2.2, + soglia3: 3.0, + created_on: new Date().toISOString(), + updated_on: new Date().toISOString(), + ...overrides, +}) + +describe('tests/utilities/river-sensors', () => { + describe('getActiveThresholds', () => { + it('returns all three thresholds when all are set', () => { + const thresholds = getActiveThresholds(makeRiver()) + + expect(thresholds).to.deep.equal([ + { key: 'soglia1', value: 1.5 }, + { key: 'soglia2', value: 2.2 }, + { key: 'soglia3', value: 3.0 }, + ]) + }) + + it('drops null thresholds', () => { + const thresholds = getActiveThresholds(makeRiver({ soglia2: null })) + + expect(thresholds).to.deep.equal([ + { key: 'soglia1', value: 1.5 }, + { key: 'soglia3', value: 3.0 }, + ]) + }) + + it('coerces numeric strings (pg returns NUMERIC as string)', () => { + const river = makeRiver({ + soglia1: '1.5' as unknown as number, + soglia2: null, + soglia3: '3.0' as unknown as number, + }) + + expect(getActiveThresholds(river)).to.deep.equal([ + { key: 'soglia1', value: 1.5 }, + { key: 'soglia3', value: 3.0 }, + ]) + }) + + it('returns empty array when no thresholds are set', () => { + expect(getActiveThresholds(makeRiver({ soglia1: null, soglia2: null, soglia3: null }))).to.deep.equal([]) + }) + }) + + describe('detectThresholdCrossings', () => { + const thresholds = [ + { key: 'soglia1' as const, value: 1.5 }, + { key: 'soglia2' as const, value: 2.2 }, + { key: 'soglia3' as const, value: 3.0 }, + ] + + it('seeds state without emitting crossings on first observation', () => { + const result = detectThresholdCrossings(2.5, thresholds, undefined) + + expect(result.crossings).to.deep.equal([]) + expect(result.nextState).to.deep.equal({ + soglia1_above: true, + soglia2_above: true, + soglia3_above: false, + }) + }) + + it('emits no crossing when the side is unchanged', () => { + const previous: ThresholdBooleans = { + soglia1_above: true, + soglia2_above: false, + soglia3_above: false, + } + + const result = detectThresholdCrossings(2.0, thresholds, previous) + + expect(result.crossings).to.deep.equal([]) + expect(result.nextState).to.deep.equal(previous) + }) + + it('emits an above crossing when the level rises past a threshold', () => { + const previous: ThresholdBooleans = { + soglia1_above: false, + soglia2_above: false, + soglia3_above: false, + } + + const result = detectThresholdCrossings(1.8, thresholds, previous) + + expect(result.crossings).to.deep.equal([ + { threshold: { key: 'soglia1', value: 1.5 }, direction: 'above' }, + ]) + expect(result.nextState).to.deep.equal({ + soglia1_above: true, + soglia2_above: false, + soglia3_above: false, + }) + }) + + it('emits a below crossing when the level drops under a threshold', () => { + const previous: ThresholdBooleans = { + soglia1_above: true, + soglia2_above: false, + soglia3_above: false, + } + + const result = detectThresholdCrossings(1.2, thresholds, previous) + + expect(result.crossings).to.deep.equal([ + { threshold: { key: 'soglia1', value: 1.5 }, direction: 'below' }, + ]) + }) + + it('emits one crossing per threshold when multiple thresholds flip at once', () => { + const previous: ThresholdBooleans = { + soglia1_above: false, + soglia2_above: false, + soglia3_above: false, + } + + const result = detectThresholdCrossings(3.5, thresholds, previous) + + expect(result.crossings).to.deep.equal([ + { threshold: { key: 'soglia1', value: 1.5 }, direction: 'above' }, + { threshold: { key: 'soglia2', value: 2.2 }, direction: 'above' }, + { threshold: { key: 'soglia3', value: 3.0 }, direction: 'above' }, + ]) + expect(result.nextState).to.deep.equal({ + soglia1_above: true, + soglia2_above: true, + soglia3_above: true, + }) + }) + + it('treats a null previous boolean as no-prior-reference for that threshold', () => { + const previous: ThresholdBooleans = { + soglia1_above: null, + soglia2_above: false, + soglia3_above: null, + } + + const result = detectThresholdCrossings(2.5, thresholds, previous) + + expect(result.crossings).to.deep.equal([ + { threshold: { key: 'soglia2', value: 2.2 }, direction: 'above' }, + ]) + expect(result.nextState).to.deep.equal({ + soglia1_above: true, + soglia2_above: true, + soglia3_above: false, + }) + }) + + it('only considers provided active thresholds', () => { + const previous: ThresholdBooleans = { + soglia1_above: false, + soglia2_above: false, + soglia3_above: false, + } + + const result = detectThresholdCrossings(2.5, [{ key: 'soglia1', value: 1.5 }], previous) + + expect(result.crossings).to.deep.equal([ + { threshold: { key: 'soglia1', value: 1.5 }, direction: 'above' }, + ]) + expect(result.nextState).to.deep.equal({ + soglia1_above: true, + soglia2_above: null, + soglia3_above: null, + }) + }) + }) +})