diff --git a/common/networking/monitoring/README.md b/common/networking/monitoring/README.md index c38d47904ac4550dbda4d8e3915734be9792ec2b..c4ee3d075a37cfc07045adaced7ddf5510b48648 100644 --- a/common/networking/monitoring/README.md +++ b/common/networking/monitoring/README.md @@ -1,4 +1,4 @@ -Currently changes to these files have to be compiled seperately for Go and JavaScript +Currently, changes to these files have to be compiled separately for Go and JavaScript Execute each `./generate-proto.sh` in: - `#/monitoring/monitoring_ui` diff --git a/examples/producer/dummy-data-producer/dummy_data_producer.cpp b/examples/producer/dummy-data-producer/dummy_data_producer.cpp index a636bbfe47b42f3973dde3f560240cedf533ed6f..2c2a84483f2275972c563ea5712dae6acd9d20bd 100644 --- a/examples/producer/dummy-data-producer/dummy_data_producer.cpp +++ b/examples/producer/dummy-data-producer/dummy_data_producer.cpp @@ -41,7 +41,7 @@ void PrintCommandArguments(const Args& args) { << "Raw: " << (args.mode / 100 == 1) << std::endl << "timeout: " << args.timeout_ms << std::endl << "messages in set: " << args.messages_in_set << std::endl - << "pipeline: " << args.pipeline_name << std::endl + << "pipelineStep: " << args.pipeline_name << std::endl << std::endl; } diff --git a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go index 9320ee0da5f7d412e9b24d50d9ae5cdae7f043bc..66757cbb003a2346e2738120249a0e5faec4c506 100644 --- a/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go +++ b/monitoring/monitoring_server/src/asapo_monitoring_server/server/QueryServer.go @@ -451,7 +451,8 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( var step = getOrCreateStep(stepId) step.producesSourceId[source] = true step.producerInstances[producerInstanceId] = true - step.involvedReceivers[result.Record().Values()["receiverName"].(string)] = true + var receiverName = result.Record().Values()["receiverName"].(string) + step.involvedReceivers[receiverName] = true } else if result.Record().Values()["brokerName"] != nil { // data is coming from broker => means it must be a consumer stepId := result.Record().Values()["pipelineStepId"].(string) source := result.Record().Values()["source"].(string) @@ -539,11 +540,12 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( }) } for _, sourceIdAvailableFromStepId := range availableSources[consumingSourceId] { // Add all the others edges + var producingSource = pipelineSteps[sourceIdAvailableFromStepId] edges = append(edges, PipelineEdgeInfo{ fromStepId: sourceIdAvailableFromStepId, toStepId: stepId, sourceId: consumingSourceId, - involvedReceivers: stepInfo.involvedReceivers, + involvedReceivers: producingSource.involvedReceivers, }) } } @@ -567,8 +569,44 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( if !foundAtLeastOne { // probably only requests of files came, but no receiver registered a producer - log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") - return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") + // log.Error("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") + // return nil, errors.New("infinite loop while building topology tree; Still has pipeline steps but found no way how to connect them") + + var unkStep = "Unknown" + + pipelineSteps[unkStep] = PipelineStep{ + producesSourceId: nil, + consumesSourceId: nil, + producerInstances: nil, + consumerInstances: nil, + involvedReceivers: nil, + } + + levels[0] = append(levels[0], PipelineLevelStepInfo{ + stepId: unkStep, + }) + + if len(levels) < 2 { + levels = append(levels, []PipelineLevelStepInfo{}) + } + + for stepId := range remainingStepIds { + var step = pipelineSteps[stepId] + levels[1] = append(levels[1], PipelineLevelStepInfo{ + stepId: stepId, + }) + + for sourceId := range step.consumesSourceId { + edges = append(edges, PipelineEdgeInfo{ + fromStepId: unkStep, + toStepId: stepId, + sourceId: sourceId, + involvedReceivers: nil, + }) + } + } + + break // Go to response building } for sourceId, element := range availableSourcesInNextLevel { @@ -582,6 +620,7 @@ func (s *QueryServer) GetTopology(ctx context.Context, query *pb.ToplogyQuery) ( } } + // Response building var response pb.TopologyResponse for levelIndex, level := range levels { diff --git a/monitoring/monitoring_ui/src/components/FilterSelector.vue b/monitoring/monitoring_ui/src/components/FilterSelector.vue index 839331a3d142cb0c6b9f21c71a3e9418cf4234e8..b38266ca573943692136f2551424d2ec68ded99a 100644 --- a/monitoring/monitoring_ui/src/components/FilterSelector.vue +++ b/monitoring/monitoring_ui/src/components/FilterSelector.vue @@ -52,7 +52,7 @@ <select name="Source" id="source" class="w-32" v-model="selectedSource"> <option v-for="source in availableSources" :key="source">{{source}}</option> </select> - <button>X</button> + <button @click="clearSourceFilter()">X</button> </div> </div> <div class="mt-3 flex flex-col"> @@ -87,14 +87,23 @@ import { toplogyStore } from "../store/toplogyStore"; @Options({ watch: { + selectedBeamtimeFromStore: { + handler(newValue: string | null): void { + this.selectedBeamtime = newValue; + }, + immediate: true, + }, selectedBeamtime(newValue: string | null): void { selectionFilterStore.setFilterBeamtime(newValue); }, currentBeamtimeFilterText(): void { this.selectedBeamtime = selectionFilterStore.state.beamtime; }, + selectedSourceFromStore(newValue: string | null): void { + this.selectedSource = newValue; + }, selectedSource(newValue: string | null): void { - //selectionFilterStore.setFilterBeamtime(newValue); + selectionFilterStore.setFilterSource(newValue); }, currentSourceFilterText(): void { this.selectedSource = selectionFilterStore.state.source; @@ -106,6 +115,13 @@ export default class FilterSelector extends Vue { private selectedSource: string | null = null; private showPopup: boolean = false; + private get selectedBeamtimeFromStore(): string | null { + return selectionFilterStore.state.beamtime; + } + private get selectedSourceFromStore(): string | null { + return selectionFilterStore.state.source; + } + private get hasClearableFilter(): boolean { return selectionFilterStore.hasClearableFilter; } @@ -152,7 +168,6 @@ export default class FilterSelector extends Vue { } private get hasFilterError(): boolean { - console.log('Filte error text: ', errorStore.state.filterErrorText) return !!errorStore.state.filterErrorText; } @@ -167,6 +182,10 @@ export default class FilterSelector extends Vue { selectionFilterStore.setFilterBeamtime(null); } + private clearSourceFilter(): void { + selectionFilterStore.clearSourceFilter(); + } + private get currentSourceFilterText(): string { if (selectionFilterStore.state.source) { return selectionFilterStore.state.source; diff --git a/monitoring/monitoring_ui/src/store/selectionFilterStore.ts b/monitoring/monitoring_ui/src/store/selectionFilterStore.ts index beef5c37a910064217c6493579eacd39b370b02e..21076600530da1fda8ff60ef4a123e0c8d5f3b85 100644 --- a/monitoring/monitoring_ui/src/store/selectionFilterStore.ts +++ b/monitoring/monitoring_ui/src/store/selectionFilterStore.ts @@ -49,6 +49,26 @@ class SelectionFilterStore { } } + public clearSourceFilter(): void { + this.internalState.source = null; + this.internalState.fromPipelineStepId = null; + this.internalState.toPipelineStepId = null; + this.internalState.pipelineFilterRelation = 'and'; + this.internalState.receiverId = null; + } + + public setFilterSource(source: string | null): void { + if (source == null) { + this.clearSourceFilter(); + return; + } + this.internalState.source = source; + this.internalState.fromPipelineStepId = null; + this.internalState.toPipelineStepId = null; + this.internalState.pipelineFilterRelation = 'or'; + this.internalState.receiverId = null; + } + public setFilterSourceWithPipeline(source: string, fromPipelineStepId: string, toPipelineStepId: string): void { this.internalState.source = source; this.internalState.fromPipelineStepId = fromPipelineStepId; diff --git a/receiver/src/monitoring/receiver_monitoring_client_impl.cpp b/receiver/src/monitoring/receiver_monitoring_client_impl.cpp index 3c98a45d4dadc8e475ee0a204867c26aa33b65f1..0ce725f376911e996533fe40ceefcfcadfd9b207 100644 --- a/receiver/src/monitoring/receiver_monitoring_client_impl.cpp +++ b/receiver/src/monitoring/receiver_monitoring_client_impl.cpp @@ -221,6 +221,8 @@ asapo::Error asapo::ReceiverMonitoringClientImpl::ReinitializeClient() { newChannel.reset(); } + StartSendingThread(); + return nullptr; }