Python样例
样例一
函数原型:
def SendData(streamName: bytes, inPluginId: int, dataInput: MxDataInput) -> int:
# -*- coding:utf-8 -*- # import StreamManagerApi.py from StreamManagerApi import * import MxpiDataType_pb2 as MxpiDataType if __name__ == '__main__': # init stream manager streamManagerApi = StreamManagerApi() ret = streamManagerApi.InitManager() if ret != 0: print("Failed to init Stream manager, ret=%s" % str(ret)) exit() # create streams by pipeline config file with open("../pipeline/Sample.pipeline", 'rb') as f: pipelineStr = f.read() ret = streamManagerApi.CreateMultipleStreams(pipelineStr) if ret != 0: print("Failed to create Stream, ret=%s" % str(ret)) exit() # Construct the input of the stream dataInput = MxDataInput() with open("test.jpg", 'rb') as f: dataInput.data = f.read() # print(dataInput.data) # The following is how to set the dataInput.roiBoxs """ roiVector = RoiBoxVector() roi = RoiBox() roi.x0 = 100 roi.y0 = 100 roi.x1 = 200 roi.y1 = 200 roiVector.push_back(roi) dataInput.roiBoxs = roiVector """ # Inputs data to a specified stream based on streamName. streamName = b'classification+detection' inPluginId = 0 # Inputs data to a specified stream based on streamName. uniqueId = streamManagerApi.SendData(streamName, inPluginId, dataInput) if uniqueId < 0: print("Failed to send data to stream.") exit() # Obtain the inference result by specifying streamName and uniqueId. inferResult = streamManagerApi.GetResult(streamName, inPluginId) if inferResult.errorCode != 0: print("GetResultWithUniqueId error. errorCode=%d, errorMsg=%s" % ( inferResult.errorCode, inferResult.data.decode())) exit() # print the infer result print(inferResult.data.decode()) # destroy streams streamManagerApi.DestroyAllStreams()
样例二
函数原型:
def SendData(streamName: bytes, elementName: bytes, dataInput: MxDataInput) -> int:
ELEMENT_NAME = b'appsrc0' unique_id = stream_manager_api.SendData(STREAM_NAME, ELEMENT_NAME, data_input) if unique_id < 0: print("Failed to send data to stream.") exit() infer_result = stream_manager_api.GetResult(STREAM_NAME, unique_id) if infer_result.errorCode != 0: print("GetResult error. errorCode=%d, errorMsg=%s" % ( infer_result.errorCode, infer_result.data.decode())) exit() print("result: {}".format(infer_result.data.decode()))
样例三
函数原型
def SendData(streamName: bytes, elementName: bytes, metadataVec: MetadataInputVector, databuffer: MxBufferInput) -> int:
ELEMENT_NAME = b'appsrc0' error_code = stream_manager_api.SendData(STREAM_NAME, ELEMENT_NAME, metedata_vec, buffer_input) if error_code < 0: print("Failed to send data to stream.") exit() data_source_vector = StringVector() data_source_vector.push_back(b"appsrc0") infer_result = stream_manager_api.GetResult(STREAM_NAME, b'appsink0', data_source_vector) if (infer_result.errorCode != 0): print("GetResult failed") exit() if (infer_result.bufferOutput.data is None): print("bufferOutput nullptr") exit() print("result: {}".format(infer_result.bufferOutput.data.decode()))