K Frequent in Sorted stream
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
public class KFrequentDataStream {
class ArrayListStream implements SortedStream {
List<Integer> data;
int index = -1;
ArrayListStream(List<Integer> data) {
this.data = data;
}
@Override
public boolean move() {
return (++index < data.size());
}
@Override
public int value() {
return data.get(index);
}
}
public interface SortedStream {
/*
* Positions the stream at the next valid integer.
* @return true iff the stream was positioned at a valid integer.
*/
boolean move();
/*
*
*@return the integer at the current position.
*@throws RuntimeException if called before calling move() at least once or called
* after move() returned false.
*/
int value();
}
//繁琐
public static List<Integer> getNumbersInAtLeastKStreams(List<ArrayListStream> streams, int k) {
Map<Integer,Integer> map = new HashMap<>();
List<Integer> result = new ArrayList<>();
if(streams == null || streams.size() ==0 || k < 0){
return result;
}
for(ArrayListStream stream : streams) {
Set<Integer> curSet = new HashSet<>();
while(stream.move()){
int current = stream.value();
if(!curSet.contains(current)){
curSet.add(current);
if(map.containsKey(current)){
map.put(current, map.get(current)+1);
if(map.get(current) >= k && !result.contains(current)){
result.add(current);
}
} else {
map.put(current,1);
}
}
}
}
return result;
}
// public SortedStream readyFromSoruece (SortedStream largeSteam , int size) {
// SortedStream shortSteam = new SortedStream( new int[size]);
// int index = 0;
// int[] data = new int[size];
// while( i< size0 && shortSteam.move() ) {
// data[i++] = shortSteam.value();
// }
// return shortStream;
// }
public static void main(String[] args) {
List<SortedStream> testList = new ArrayList<>();
int[] data1 = {-1, 3, 5, 6, 6, 7, 9, 12, 14,51};
SortedStream stream1 = new ArrayBackedStream(data1);
testList.add(stream1);
int[] data2 = {0, 3, 5, 5, 9, 13, 15,51};
SortedStream stream2 = new ArrayBackedStream(data2);
testList.add(stream2);
int[] data3 = {5, 5, 7,11, 19, 21, 51};
SortedStream stream3 = new ArrayBackedStream(data3);
testList.add(stream3);
System.out.println(find(testList,3));
System.out.println(findMoreThanKTimes(testList,2));
System.out.println(getNumbersInAtLeastKStreams(testList,2));
}
//-1 3 5 6 6 7 9 12 14
//0 3 5 5 9 13 15
//5 5 7 11 19 21 51
//k = 3
//streams.size() (m) is small
//but number of elements in stream is large
//3 5 7 9
//Imhotep
public static List<Integer> find(List<ArrayListStream> list, int k){
List<Integer> result = new ArrayList<>();
PriorityQueue<ArrayListStream> heap = new PriorityQueue<>(11,new SortedStreamComparator());
for(ArrayListStream st:list){
if(st.move()){
heap.offer(st);
}
}
if (heap.isEmpty()) {
return result;
}
ArrayListStream curStream = heap.poll();
int curVal = curStream.value(), count = 1;
//delete same
while (curStream.move()) {
int val = curStream.value();
if (val == curVal) {
continue;
}
heap.add(curStream);
break;
}
//下一步
while (!heap.isEmpty()) {
curStream = heap.peek();
if (curStream.value() == curVal) {
count++;
}else {
if (count >= k) {
result.add(curVal);
}
if (heap.size() < k) {
break;
}
curVal = curStream.value();
count = 1;
}
//delete same
curStream = heap.poll();
while (curStream.move()) {
int val = curStream.value();
if (val == curVal) {
continue;
}
heap.offer(curStream);
break;
}
}
//最后一步
if (count >= k) {
result.add(curVal);
}
return result;
}
public static class SortedStreamComparator implements Comparator<ArrayListStream> {
@Override
public int compare(ArrayListStream s1, ArrayListStream s2){
return s1.value() - s2.value();
}
}
}